This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 09765db feat: Bump hive_metastore to use pure rust thrift impl `volo`
(#174)
09765db is described below
commit 09765db611a65a21b88e839d781780c75924e560
Author: Xuanwo <[email protected]>
AuthorDate: Mon Feb 5 09:45:23 2024 +0800
feat: Bump hive_metastore to use pure rust thrift impl `volo` (#174)
---
Cargo.toml | 3 +-
crates/catalog/hms/Cargo.toml | 10 ++----
crates/catalog/hms/src/catalog.rs | 76 +++++++++++++++++++--------------------
crates/catalog/hms/src/utils.rs | 17 ++++++++-
4 files changed, 59 insertions(+), 47 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 5c01a90..c2388b0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -67,5 +67,6 @@ typed-builder = "^0.18"
url = "2"
urlencoding = "2"
uuid = "1.6.1"
-
+volo-thrift = "0.9.2"
+hive_metastore = "0.0.2"
tera = "1"
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index 693d4e9..e03733b 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -28,13 +28,9 @@ license = { workspace = true }
keywords = ["iceberg", "hive", "catalog"]
[dependencies]
+anyhow = { workspace = true }
async-trait = { workspace = true }
-hive_metastore = "0.0.1"
+hive_metastore = { workspace = true }
iceberg = { workspace = true }
-# the thrift upstream suffered from no regular rust release.
-#
-# [test-rs](https://github.com/tent-rs) is an organization that helps resolves
this
-# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the
thrift
-# crate, built from the thrift upstream with only version bumped.
-thrift = { package = "tent-thrift", version = "0.18.1" }
typed-builder = { workspace = true }
+volo-thrift = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index 2b1fe2c..f3eab62 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -17,16 +17,16 @@
use super::utils::*;
use async_trait::async_trait;
-use hive_metastore::{TThriftHiveMetastoreSyncClient,
ThriftHiveMetastoreSyncClient};
+use hive_metastore::ThriftHiveMetastoreClient;
+use hive_metastore::ThriftHiveMetastoreClientBuilder;
use iceberg::table::Table;
-use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent};
+use iceberg::{
+ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation,
+ TableIdent,
+};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
-use std::sync::{Arc, Mutex};
-use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
-use thrift::transport::{
- ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel,
WriteHalf,
-};
+use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;
/// Hive metastore Catalog configuration.
@@ -35,24 +35,7 @@ pub struct HmsCatalogConfig {
address: String,
}
-/// TODO: We only support binary protocol for now.
-type HmsClientType = ThriftHiveMetastoreSyncClient<
-
TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<thrift::transport::TTcpChannel>>>,
-
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<thrift::transport::TTcpChannel>>>,
->;
-
-/// # TODO
-///
-/// we are using the same connection everytime, we should support connection
-/// pool in the future.
-struct HmsClient(Arc<Mutex<HmsClientType>>);
-
-impl HmsClient {
- fn call<T>(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result<T>)
-> Result<T> {
- let mut client = self.0.lock().unwrap();
- f(&mut client).map_err(from_thrift_error)
- }
-}
+struct HmsClient(ThriftHiveMetastoreClient);
/// Hive metastore Catalog.
pub struct HmsCatalog {
@@ -71,19 +54,29 @@ impl Debug for HmsCatalog {
impl HmsCatalog {
/// Create a new hms catalog.
pub fn new(config: HmsCatalogConfig) -> Result<Self> {
- let mut channel = thrift::transport::TTcpChannel::new();
- channel
- .open(config.address.as_str())
- .map_err(from_thrift_error)?;
- let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?;
- let i_chan = TBufferedReadTransport::new(i_chan);
- let o_chan = TBufferedWriteTransport::new(o_chan);
- let i_proto = TBinaryInputProtocol::new(i_chan, true);
- let o_proto = TBinaryOutputProtocol::new(o_chan, true);
- let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto);
+ let address = config
+ .address
+ .as_str()
+ .to_socket_addrs()
+ .map_err(from_io_error)?
+ .next()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("invalid address: {}", config.address),
+ )
+ })?;
+
+ let client = ThriftHiveMetastoreClientBuilder::new("hms")
+ .address(address)
+ // Framed thrift rpc is not enabled by default in HMS, we use
+ // buffered instead.
+
.make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
+ .build();
+
Ok(Self {
config,
- client: HmsClient(Arc::new(Mutex::new(client))),
+ client: HmsClient(client),
})
}
}
@@ -103,10 +96,17 @@ impl Catalog for HmsCatalog {
let dbs = if parent.is_some() {
return Ok(vec![]);
} else {
- self.client.call(|client| client.get_all_databases())?
+ self.client
+ .0
+ .get_all_databases()
+ .await
+ .map_err(from_thrift_error)?
};
- Ok(dbs.into_iter().map(NamespaceIdent::new).collect())
+ Ok(dbs
+ .into_iter()
+ .map(|v| NamespaceIdent::new(v.into()))
+ .collect())
}
async fn create_namespace(
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 0daa52a..1b0ff33 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -15,13 +15,28 @@
// specific language governing permissions and limitations
// under the License.
+use anyhow::anyhow;
use iceberg::{Error, ErrorKind};
+use std::fmt::Debug;
+use std::io;
/// Format a thrift error into iceberg error.
-pub fn from_thrift_error(error: thrift::Error) -> Error {
+pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) ->
Error
+where
+ T: Debug,
+{
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting thrift error".to_string(),
)
+ .with_source(anyhow!("thrift error: {:?}", error))
+}
+
+/// Format an io error into iceberg error.
+pub fn from_io_error(error: io::Error) -> Error {
+ Error::new(
+ ErrorKind::Unexpected,
+ "operation failed for hitting io error".to_string(),
+ )
.with_source(error)
}