This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new eb33e90  feat: Add hms catalog layout (#112)
eb33e90 is described below

commit eb33e9084c0c8bdc0077cb689828766a8c740f27
Author: Xuanwo <[email protected]>
AuthorDate: Thu Dec 7 19:29:38 2023 +0800

    feat: Add hms catalog layout (#112)
    
    * feat: Add hms catalog layout
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Sort deps
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 crates/catalog/hms/Cargo.toml     |  39 +++++++++
 crates/catalog/hms/src/catalog.rs | 171 ++++++++++++++++++++++++++++++++++++++
 crates/catalog/hms/src/lib.rs     |  25 ++++++
 crates/catalog/hms/src/utils.rs   |  27 ++++++
 4 files changed, 262 insertions(+)

diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
new file mode 100644
index 0000000..61c03fd
--- /dev/null
+++ b/crates/catalog/hms/Cargo.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-hms"
+version = "0.1.0"
+edition = "2021"
+
+categories = ["database"]
+description = "Apache Iceberg Hive Metastore Catalog Support"
+repository = "https://github.com/apache/iceberg-rust";
+license = "Apache-2.0"
+keywords = ["iceberg", "hive", "catalog"]
+
+[dependencies]
+async-trait = { workspace = true }
+hive_metastore = "0.0.1"
+iceberg = { workspace = true }
+# the thrift upstream suffered from no regular rust release.
+#
+# [test-rs](https://github.com/tent-rs) is an organization that helps resolves 
this
+# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the 
thrift
+# crate, built from the thrift upstream with only version bumped.
+thrift = { package = "tent-thrift", version = "0.18.1" }
+typed-builder = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
new file mode 100644
index 0000000..2b1fe2c
--- /dev/null
+++ b/crates/catalog/hms/src/catalog.rs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::utils::*;
+use async_trait::async_trait;
+use hive_metastore::{TThriftHiveMetastoreSyncClient, 
ThriftHiveMetastoreSyncClient};
+use iceberg::table::Table;
+use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation, TableIdent};
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex};
+use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
+use thrift::transport::{
+    ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, 
WriteHalf,
+};
+use typed_builder::TypedBuilder;
+
+/// Hive metastore Catalog configuration.
+#[derive(Debug, TypedBuilder)]
+pub struct HmsCatalogConfig {
+    address: String,
+}
+
+/// TODO: We only support binary protocol for now.
+type HmsClientType = ThriftHiveMetastoreSyncClient<
+    
TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<thrift::transport::TTcpChannel>>>,
+    
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<thrift::transport::TTcpChannel>>>,
+>;
+
+/// # TODO
+///
+/// we are using the same connection everytime, we should support connection
+/// pool in the future.
+struct HmsClient(Arc<Mutex<HmsClientType>>);
+
+impl HmsClient {
+    fn call<T>(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result<T>) 
-> Result<T> {
+        let mut client = self.0.lock().unwrap();
+        f(&mut client).map_err(from_thrift_error)
+    }
+}
+
+/// Hive metastore Catalog.
+pub struct HmsCatalog {
+    config: HmsCatalogConfig,
+    client: HmsClient,
+}
+
+impl Debug for HmsCatalog {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HmsCatalog")
+            .field("config", &self.config)
+            .finish_non_exhaustive()
+    }
+}
+
+impl HmsCatalog {
+    /// Create a new hms catalog.
+    pub fn new(config: HmsCatalogConfig) -> Result<Self> {
+        let mut channel = thrift::transport::TTcpChannel::new();
+        channel
+            .open(config.address.as_str())
+            .map_err(from_thrift_error)?;
+        let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?;
+        let i_chan = TBufferedReadTransport::new(i_chan);
+        let o_chan = TBufferedWriteTransport::new(o_chan);
+        let i_proto = TBinaryInputProtocol::new(i_chan, true);
+        let o_proto = TBinaryOutputProtocol::new(o_chan, true);
+        let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto);
+        Ok(Self {
+            config,
+            client: HmsClient(Arc::new(Mutex::new(client))),
+        })
+    }
+}
+
+/// Refer to 
<https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java>
 for implementation details.
+#[async_trait]
+impl Catalog for HmsCatalog {
+    /// HMS doesn't support nested namespaces.
+    ///
+    /// We will return empty list if parent is some.
+    ///
+    /// Align with java implementation: 
<https://github.com/apache/iceberg/blob/9bd62f79f8cd973c39d14e89163cb1c707470ed2/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java#L305C26-L330>
+    async fn list_namespaces(
+        &self,
+        parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let dbs = if parent.is_some() {
+            return Ok(vec![]);
+        } else {
+            self.client.call(|client| client.get_all_databases())?
+        };
+
+        Ok(dbs.into_iter().map(NamespaceIdent::new).collect())
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, _namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        todo!()
+    }
+
+    async fn create_table(
+        &self,
+        _namespace: &NamespaceIdent,
+        _creation: TableCreation,
+    ) -> Result<Table> {
+        todo!()
+    }
+
+    async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
+        todo!()
+    }
+
+    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn stat_table(&self, _table: &TableIdent) -> Result<bool> {
+        todo!()
+    }
+
+    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> 
Result<()> {
+        todo!()
+    }
+
+    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
+        todo!()
+    }
+}
diff --git a/crates/catalog/hms/src/lib.rs b/crates/catalog/hms/src/lib.rs
new file mode 100644
index 0000000..b75e749
--- /dev/null
+++ b/crates/catalog/hms/src/lib.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg Hive Metastore Catalog implementation.
+
+#![deny(missing_docs)]
+
+mod catalog;
+pub use catalog::*;
+
+mod utils;
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
new file mode 100644
index 0000000..0daa52a
--- /dev/null
+++ b/crates/catalog/hms/src/utils.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iceberg::{Error, ErrorKind};
+
+/// Format a thrift error into iceberg error.
+pub fn from_thrift_error(error: thrift::Error) -> Error {
+    Error::new(
+        ErrorKind::Unexpected,
+        "operation failed for hitting thrift error".to_string(),
+    )
+    .with_source(error)
+}

Reply via email to