This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 09765db  feat: Bump hive_metastore to use pure rust thrift impl `volo` 
(#174)
09765db is described below

commit 09765db611a65a21b88e839d781780c75924e560
Author: Xuanwo <[email protected]>
AuthorDate: Mon Feb 5 09:45:23 2024 +0800

    feat: Bump hive_metastore to use pure rust thrift impl `volo` (#174)
---
 Cargo.toml                        |  3 +-
 crates/catalog/hms/Cargo.toml     | 10 ++----
 crates/catalog/hms/src/catalog.rs | 76 +++++++++++++++++++--------------------
 crates/catalog/hms/src/utils.rs   | 17 ++++++++-
 4 files changed, 59 insertions(+), 47 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5c01a90..c2388b0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -67,5 +67,6 @@ typed-builder = "^0.18"
 url = "2"
 urlencoding = "2"
 uuid = "1.6.1"
-
+volo-thrift = "0.9.2"
+hive_metastore = "0.0.2"
 tera = "1"
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index 693d4e9..e03733b 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -28,13 +28,9 @@ license = { workspace = true }
 keywords = ["iceberg", "hive", "catalog"]
 
 [dependencies]
+anyhow = { workspace = true }
 async-trait = { workspace = true }
-hive_metastore = "0.0.1"
+hive_metastore = { workspace = true }
 iceberg = { workspace = true }
-# the thrift upstream suffered from no regular rust release.
-#
-# [test-rs](https://github.com/tent-rs) is an organization that helps resolves 
this
-# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the 
thrift
-# crate, built from the thrift upstream with only version bumped.
-thrift = { package = "tent-thrift", version = "0.18.1" }
 typed-builder = { workspace = true }
+volo-thrift = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
index 2b1fe2c..f3eab62 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -17,16 +17,16 @@
 
 use super::utils::*;
 use async_trait::async_trait;
-use hive_metastore::{TThriftHiveMetastoreSyncClient, 
ThriftHiveMetastoreSyncClient};
+use hive_metastore::ThriftHiveMetastoreClient;
+use hive_metastore::ThriftHiveMetastoreClientBuilder;
 use iceberg::table::Table;
-use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation, TableIdent};
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
 use std::collections::HashMap;
 use std::fmt::{Debug, Formatter};
-use std::sync::{Arc, Mutex};
-use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
-use thrift::transport::{
-    ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, 
WriteHalf,
-};
+use std::net::ToSocketAddrs;
 use typed_builder::TypedBuilder;
 
 /// Hive metastore Catalog configuration.
@@ -35,24 +35,7 @@ pub struct HmsCatalogConfig {
     address: String,
 }
 
-/// TODO: We only support binary protocol for now.
-type HmsClientType = ThriftHiveMetastoreSyncClient<
-    
TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<thrift::transport::TTcpChannel>>>,
-    
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<thrift::transport::TTcpChannel>>>,
->;
-
-/// # TODO
-///
-/// we are using the same connection everytime, we should support connection
-/// pool in the future.
-struct HmsClient(Arc<Mutex<HmsClientType>>);
-
-impl HmsClient {
-    fn call<T>(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result<T>) 
-> Result<T> {
-        let mut client = self.0.lock().unwrap();
-        f(&mut client).map_err(from_thrift_error)
-    }
-}
+struct HmsClient(ThriftHiveMetastoreClient);
 
 /// Hive metastore Catalog.
 pub struct HmsCatalog {
@@ -71,19 +54,29 @@ impl Debug for HmsCatalog {
 impl HmsCatalog {
     /// Create a new hms catalog.
     pub fn new(config: HmsCatalogConfig) -> Result<Self> {
-        let mut channel = thrift::transport::TTcpChannel::new();
-        channel
-            .open(config.address.as_str())
-            .map_err(from_thrift_error)?;
-        let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?;
-        let i_chan = TBufferedReadTransport::new(i_chan);
-        let o_chan = TBufferedWriteTransport::new(o_chan);
-        let i_proto = TBinaryInputProtocol::new(i_chan, true);
-        let o_proto = TBinaryOutputProtocol::new(o_chan, true);
-        let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto);
+        let address = config
+            .address
+            .as_str()
+            .to_socket_addrs()
+            .map_err(from_io_error)?
+            .next()
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    format!("invalid address: {}", config.address),
+                )
+            })?;
+
+        let client = ThriftHiveMetastoreClientBuilder::new("hms")
+            .address(address)
+            // Framed thrift rpc is not enabled by default in HMS, we use
+            // buffered instead.
+            
.make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
+            .build();
+
         Ok(Self {
             config,
-            client: HmsClient(Arc::new(Mutex::new(client))),
+            client: HmsClient(client),
         })
     }
 }
@@ -103,10 +96,17 @@ impl Catalog for HmsCatalog {
         let dbs = if parent.is_some() {
             return Ok(vec![]);
         } else {
-            self.client.call(|client| client.get_all_databases())?
+            self.client
+                .0
+                .get_all_databases()
+                .await
+                .map_err(from_thrift_error)?
         };
 
-        Ok(dbs.into_iter().map(NamespaceIdent::new).collect())
+        Ok(dbs
+            .into_iter()
+            .map(|v| NamespaceIdent::new(v.into()))
+            .collect())
     }
 
     async fn create_namespace(
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 0daa52a..1b0ff33 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -15,13 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use anyhow::anyhow;
 use iceberg::{Error, ErrorKind};
+use std::fmt::Debug;
+use std::io;
 
 /// Format a thrift error into iceberg error.
-pub fn from_thrift_error(error: thrift::Error) -> Error {
+pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> 
Error
+where
+    T: Debug,
+{
     Error::new(
         ErrorKind::Unexpected,
         "operation failed for hitting thrift error".to_string(),
     )
+    .with_source(anyhow!("thrift error: {:?}", error))
+}
+
+/// Format an io error into iceberg error.
+pub fn from_io_error(error: io::Error) -> Error {
+    Error::new(
+        ErrorKind::Unexpected,
+        "operation failed for hitting io error".to_string(),
+    )
     .with_source(error)
 }

Reply via email to