This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f17bf30  feat: Initial version of rest catalog. (#78)
f17bf30 is described below

commit f17bf305c9e3ea43addaeee51236ecf8ffd3629f
Author: Renjie Liu <[email protected]>
AuthorDate: Sun Oct 29 01:58:53 2023 +0800

    feat: Initial version of rest catalog. (#78)
    
    * feat: Add rest catalog
    
    * feat: Initial checkin of rest catalog
    
    * Add tests
    
    * Fix typo
    
    * Fix
    
    * Fix comments
    
    * Fix comment
    
    * Move rest catalog to another crate
    
    * Remove unused deps
    
    * Rename to iceberg-catalog-rest
    
    * Fix
    
    * Fix comments
    
    * Fix comments
---
 Cargo.toml                                  |   2 +-
 crates/{iceberg => catalog/rest}/Cargo.toml |  35 +-
 crates/catalog/rest/src/catalog.rs          | 887 ++++++++++++++++++++++++++++
 crates/catalog/rest/src/lib.rs              |  23 +
 crates/iceberg/Cargo.toml                   |   3 +
 crates/iceberg/src/catalog/mod.rs           |  47 +-
 crates/iceberg/src/error.rs                 |  12 +
 crates/iceberg/src/rest.rs                  |  22 +
 8 files changed, 998 insertions(+), 33 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 34eef8d..e8e0473 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,4 +17,4 @@
 
 [workspace]
 resolver = "2"
-members = ["crates/*"]
+members = ["crates/catalog/*", "crates/iceberg"]
diff --git a/crates/iceberg/Cargo.toml b/crates/catalog/rest/Cargo.toml
similarity index 65%
copy from crates/iceberg/Cargo.toml
copy to crates/catalog/rest/Cargo.toml
index 9f2d4a8..0531a51 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -16,45 +16,26 @@
 # under the License.
 
 [package]
-name = "iceberg"
+name = "iceberg-catalog-rest"
 version = "0.1.0"
 edition = "2021"
 
 categories = ["database"]
-description = "Apache Iceberg Rust implementation"
+description = "Apache Iceberg Rust REST API"
 repository = "https://github.com/apache/iceberg-rust";
 license = "Apache-2.0"
-keywords = ["iceberg"]
+keywords = ["iceberg", "rest", "catalog"]
 
 [dependencies]
-anyhow = "1.0.72"
-apache-avro = "0.16"
-arrow-arith = { version = ">=46" }
-arrow-array = { version = ">=46" }
-arrow-schema = { version = ">=46" }
 async-trait = "0.1"
-bimap = "0.6"
-bitvec = "1.0.1"
-chrono = "0.4"
-derive_builder = "0.12.0"
-either = "1"
-futures = "0.3"
-itertools = "0.11"
-lazy_static = "1"
-murmur3 = "0.5.2"
-once_cell = "1"
-opendal = "0.41"
-ordered-float = "4.0.0"
-rust_decimal = "1.31.0"
+iceberg = { path = "../../iceberg" }
+reqwest = { version = "^0.11", features = ["json"] }
 serde = { version = "^1.0", features = ["rc"] }
-serde_bytes = "0.11.8"
 serde_derive = "^1.0"
 serde_json = "^1.0"
-serde_repr = "0.1.16"
-url = "2"
-uuid = "1.4.1"
+typed-builder = "^0.17"
+urlencoding = "2"
 
 [dev-dependencies]
-pretty_assertions = "1.4.0"
-tempdir = "0.3"
+mockito = "^1"
 tokio = { version = "1", features = ["macros"] }
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
new file mode 100644
index 0000000..f267437
--- /dev/null
+++ b/crates/catalog/rest/src/catalog.rs
@@ -0,0 +1,887 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains rest catalog implementation.
+
+use std::collections::HashMap;
+
+use async_trait::async_trait;
+use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
+use reqwest::{Client, Request};
+use serde::de::DeserializeOwned;
+use typed_builder::TypedBuilder;
+use urlencoding::encode;
+
+use iceberg::table::Table;
+use iceberg::Result;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, 
TableCreation, TableIdent,
+};
+
+use self::_serde::{
+    CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, 
ListTableResponse,
+    NamespaceSerde, RenameTableRequest, NO_CONTENT, OK,
+};
+
+const ICEBERG_REST_SPEC_VERSION: &str = "1.14";
+const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
+const PATH_V1: &str = "v1";
+
+/// Rest catalog configuration.
+#[derive(Debug, TypedBuilder)]
+pub struct RestCatalogConfig {
+    uri: String,
+    #[builder(default, setter(strip_option))]
+    warehouse: Option<String>,
+
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+impl RestCatalogConfig {
+    fn config_endpoint(&self) -> String {
+        [&self.uri, PATH_V1, "config"].join("/")
+    }
+
+    fn namespaces_endpoint(&self) -> String {
+        [&self.uri, PATH_V1, "namespaces"].join("/")
+    }
+
+    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
+        [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/")
+    }
+
+    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
+        [
+            &self.uri,
+            PATH_V1,
+            "namespaces",
+            &ns.encode_in_url(),
+            "tables",
+        ]
+        .join("/")
+    }
+
+    fn rename_table_endpoint(&self) -> String {
+        [&self.uri, PATH_V1, "tables", "rename"].join("/")
+    }
+
+    fn table_endpoint(&self, table: &TableIdent) -> String {
+        [
+            &self.uri,
+            PATH_V1,
+            "namespaces",
+            &table.namespace.encode_in_url(),
+            "tables",
+            encode(&table.name).as_ref(),
+        ]
+        .join("/")
+    }
+
+    fn try_create_rest_client(&self) -> Result<HttpClient> {
+        //TODO: We will add oauth, ssl config, sigv4 later
+        let headers = HeaderMap::from_iter([
+            (
+                header::CONTENT_TYPE,
+                HeaderValue::from_static("application/json"),
+            ),
+            (
+                HeaderName::from_static("x-client-version"),
+                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
+            ),
+            (
+                header::USER_AGENT,
+                HeaderValue::from_str(&format!("iceberg-rs/{}", 
CARGO_PKG_VERSION)).unwrap(),
+            ),
+        ]);
+
+        Ok(HttpClient(
+            Client::builder().default_headers(headers).build()?,
+        ))
+    }
+}
+
+struct HttpClient(Client);
+
+impl HttpClient {
+    async fn query<
+        R: DeserializeOwned,
+        E: DeserializeOwned + Into<Error>,
+        const SUCCESS_CODE: u16,
+    >(
+        &self,
+        request: Request,
+    ) -> Result<R> {
+        let resp = self.0.execute(request).await?;
+
+        if resp.status().as_u16() == SUCCESS_CODE {
+            let text = resp.bytes().await?;
+            Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "Failed to parse response from rest catalog server!",
+                )
+                .with_context("json", String::from_utf8_lossy(&text))
+                .with_source(e)
+            })?)
+        } else {
+            let text = resp.bytes().await?;
+            let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "Failed to parse response from rest catalog server!",
+                )
+                .with_context("json", String::from_utf8_lossy(&text))
+                .with_source(e)
+            })?;
+            Err(e.into())
+        }
+    }
+
+    async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE: 
u16>(
+        &self,
+        request: Request,
+    ) -> Result<()> {
+        let resp = self.0.execute(request).await?;
+
+        if resp.status().as_u16() == SUCCESS_CODE {
+            Ok(())
+        } else {
+            let text = resp.bytes().await?;
+            let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "Failed to parse response from rest catalog server!",
+                )
+                .with_context("json", String::from_utf8_lossy(&text))
+                .with_source(e)
+            })?;
+            Err(e.into())
+        }
+    }
+}
+
+/// Rest catalog implementation.
+pub struct RestCatalog {
+    config: RestCatalogConfig,
+    client: HttpClient,
+}
+
+#[async_trait]
+impl Catalog for RestCatalog {
+    /// List namespaces from table.
+    async fn list_namespaces(
+        &self,
+        parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let mut request = self.client.0.get(self.config.namespaces_endpoint());
+        if let Some(ns) = parent {
+            request = request.query(&[("parent", ns.encode_in_url())]);
+        }
+
+        let resp = self
+            .client
+            .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?)
+            .await?;
+
+        resp.namespaces
+            .into_iter()
+            .map(NamespaceIdent::from_vec)
+            .collect::<Result<Vec<NamespaceIdent>>>()
+    }
+
+    /// Create a new namespace inside the catalog.
+    async fn create_namespace(
+        &self,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        let request = self
+            .client
+            .0
+            .post(self.config.namespaces_endpoint())
+            .json(&NamespaceSerde {
+                namespace: namespace.as_ref().clone(),
+                properties: Some(properties),
+            })
+            .build()?;
+
+        let resp = self
+            .client
+            .query::<NamespaceSerde, ErrorModel, OK>(request)
+            .await?;
+
+        Namespace::try_from(resp)
+    }
+
+    /// Get a namespace information from the catalog.
+    async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        let request = self
+            .client
+            .0
+            .get(self.config.namespace_endpoint(namespace))
+            .build()?;
+
+        let resp = self
+            .client
+            .query::<NamespaceSerde, ErrorModel, OK>(request)
+            .await?;
+        Namespace::try_from(resp)
+    }
+
+    /// Update a namespace inside the catalog.
+    ///
+    /// # Behavior
+    ///
+    /// The properties must be the full set of namespace.
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Updating namespace not supported yet!",
+        ))
+    }
+
+    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
+        let request = self
+            .client
+            .0
+            .head(self.config.namespace_endpoint(ns))
+            .build()?;
+
+        self.client
+            .execute::<ErrorModel, NO_CONTENT>(request)
+            .await
+            .map(|_| true)
+    }
+
+    /// Drop a namespace from the catalog.
+    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+        let request = self
+            .client
+            .0
+            .delete(self.config.namespace_endpoint(namespace))
+            .build()?;
+
+        self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+    }
+
+    /// List tables from namespace.
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let request = self
+            .client
+            .0
+            .get(self.config.tables_endpoint(namespace))
+            .build()?;
+
+        let resp = self
+            .client
+            .query::<ListTableResponse, ErrorModel, OK>(request)
+            .await?;
+
+        Ok(resp.identifiers)
+    }
+
+    /// Create a new table inside the namespace.
+    async fn create_table(
+        &self,
+        _namespace: &NamespaceIdent,
+        _creation: TableCreation,
+    ) -> Result<Table> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Creating table not supported yet!",
+        ))
+    }
+
+    /// Load table from the catalog.
+    async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Creating table not supported yet!",
+        ))
+    }
+
+    /// Drop a table from the catalog.
+    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+        let request = self
+            .client
+            .0
+            .delete(self.config.table_endpoint(table))
+            .build()?;
+
+        self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+    }
+
+    /// Check if a table exists in the catalog.
+    async fn stat_table(&self, table: &TableIdent) -> Result<bool> {
+        let request = self
+            .client
+            .0
+            .head(self.config.table_endpoint(table))
+            .build()?;
+
+        self.client
+            .execute::<ErrorModel, NO_CONTENT>(request)
+            .await
+            .map(|_| true)
+    }
+
+    /// Rename a table in the catalog.
+    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> 
Result<()> {
+        let request = self
+            .client
+            .0
+            .post(self.config.rename_table_endpoint())
+            .json(&RenameTableRequest {
+                source: src.clone(),
+                destination: dest.clone(),
+            })
+            .build()?;
+
+        self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+    }
+
+    /// Update a table to the catalog.
+    async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> 
Result<Table> {
+        todo!()
+    }
+
+    /// Update multiple tables to the catalog as an atomic operation.
+    async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> 
Result<()> {
+        todo!()
+    }
+}
+
+impl RestCatalog {
+    /// Creates a rest catalog from config.
+    pub async fn new(config: RestCatalogConfig) -> Result<Self> {
+        let mut catalog = Self {
+            client: config.try_create_rest_client()?,
+            config,
+        };
+
+        catalog.update_config().await?;
+        catalog.client = catalog.config.try_create_rest_client()?;
+
+        Ok(catalog)
+    }
+
+    async fn update_config(&mut self) -> Result<()> {
+        let mut request = self.client.0.get(self.config.config_endpoint());
+
+        if let Some(warehouse_location) = &self.config.warehouse {
+            request = request.query(&[("warehouse", warehouse_location)]);
+        }
+
+        let config = self
+            .client
+            .query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
+            .await?;
+
+        let mut props = config.defaults;
+        props.extend(self.config.props.clone());
+        props.extend(config.overrides);
+
+        self.config.props = props;
+
+        Ok(())
+    }
+}
+
+/// Requests and responses for rest api.
+mod _serde {
+    use std::collections::HashMap;
+
+    use serde_derive::{Deserialize, Serialize};
+
+    use iceberg::{Error, ErrorKind, Namespace, TableIdent};
+
+    pub(super) const OK: u16 = 200u16;
+    pub(super) const NO_CONTENT: u16 = 204u16;
+    #[derive(Clone, Debug, Serialize, Deserialize)]
+    pub(super) struct CatalogConfig {
+        pub(super) overrides: HashMap<String, String>,
+        pub(super) defaults: HashMap<String, String>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct ErrorResponse {
+        error: ErrorModel,
+    }
+
+    impl From<ErrorResponse> for Error {
+        fn from(resp: ErrorResponse) -> Error {
+            resp.error.into()
+        }
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct ErrorModel {
+        pub(super) message: String,
+        pub(super) r#type: String,
+        pub(super) code: u16,
+        pub(super) stack: Option<Vec<String>>,
+    }
+
+    impl From<ErrorModel> for Error {
+        fn from(value: ErrorModel) -> Self {
+            let mut error = Error::new(ErrorKind::DataInvalid, value.message)
+                .with_context("type", value.r#type)
+                .with_context("code", format!("{}", value.code));
+
+            if let Some(stack) = value.stack {
+                error = error.with_context("stack", stack.join("\n"));
+            }
+
+            error
+        }
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct OAuthError {
+        pub(super) error: String,
+        pub(super) error_description: Option<String>,
+        pub(super) error_uri: Option<String>,
+    }
+
+    impl From<OAuthError> for Error {
+        fn from(value: OAuthError) -> Self {
+            let mut error = Error::new(
+                ErrorKind::DataInvalid,
+                format!("OAuthError: {}", value.error),
+            );
+
+            if let Some(desc) = value.error_description {
+                error = error.with_context("description", desc);
+            }
+
+            if let Some(uri) = value.error_uri {
+                error = error.with_context("uri", uri);
+            }
+
+            error
+        }
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct NamespaceSerde {
+        pub(super) namespace: Vec<String>,
+        pub(super) properties: Option<HashMap<String, String>>,
+    }
+
+    impl TryFrom<NamespaceSerde> for super::Namespace {
+        type Error = Error;
+        fn try_from(value: NamespaceSerde) -> std::result::Result<Self, 
Self::Error> {
+            Ok(super::Namespace::with_properties(
+                super::NamespaceIdent::from_vec(value.namespace)?,
+                value.properties.unwrap_or_default(),
+            ))
+        }
+    }
+
+    impl From<&Namespace> for NamespaceSerde {
+        fn from(value: &Namespace) -> Self {
+            Self {
+                namespace: value.name().as_ref().clone(),
+                properties: Some(value.properties().clone()),
+            }
+        }
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct ListNamespaceResponse {
+        pub(super) namespaces: Vec<Vec<String>>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct UpdateNamespacePropsRequest {
+        removals: Option<Vec<String>>,
+        updates: Option<HashMap<String, String>>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct UpdateNamespacePropsResponse {
+        updated: Vec<String>,
+        removed: Vec<String>,
+        missing: Option<Vec<String>>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct ListTableResponse {
+        pub(super) identifiers: Vec<TableIdent>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize)]
+    pub(super) struct RenameTableRequest {
+        pub(super) source: TableIdent,
+        pub(super) destination: TableIdent,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use mockito::{Mock, Server, ServerGuard};
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_update_config() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = server
+            .mock("GET", "/v1/config")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "overrides": {
+                    "warehouse": "s3://iceberg-catalog"
+                },
+                "defaults": {}
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        assert_eq!(
+            catalog.config.props.get("warehouse"),
+            Some(&"s3://iceberg-catalog".to_string())
+        );
+
+        config_mock.assert_async().await;
+    }
+
+    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
+        server
+            .mock("GET", "/v1/config")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "overrides": {
+                    "warehouse": "s3://iceberg-catalog"
+                },
+                "defaults": {}
+            }"#,
+            )
+            .create_async()
+            .await
+    }
+
+    #[tokio::test]
+    async fn test_list_namespace() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let list_ns_mock = server
+            .mock("GET", "/v1/namespaces")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns1", "ns11"],
+                    ["ns2"]
+                ]
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        let namespaces = catalog.list_namespaces(None).await.unwrap();
+
+        let expected_ns = vec![
+            NamespaceIdent::from_vec(vec!["ns1".to_string(), 
"ns11".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
+        ];
+
+        assert_eq!(expected_ns, namespaces);
+
+        config_mock.assert_async().await;
+        list_ns_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_create_namespace() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let create_ns_mock = server
+            .mock("POST", "/v1/namespaces")
+            .with_body(
+                r#"{
+                "namespace": [ "ns1", "ns11"],
+                "properties" : {
+                    "key1": "value1"
+                }
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        let namespaces = catalog
+            .create_namespace(
+                &NamespaceIdent::from_vec(vec!["ns1".to_string(), 
"ns11".to_string()]).unwrap(),
+                HashMap::from([("key1".to_string(), "value1".to_string())]),
+            )
+            .await
+            .unwrap();
+
+        let expected_ns = Namespace::with_properties(
+            NamespaceIdent::from_vec(vec!["ns1".to_string(), 
"ns11".to_string()]).unwrap(),
+            HashMap::from([("key1".to_string(), "value1".to_string())]),
+        );
+
+        assert_eq!(expected_ns, namespaces);
+
+        config_mock.assert_async().await;
+        create_ns_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_get_namespace() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let get_ns_mock = server
+            .mock("GET", "/v1/namespaces/ns1")
+            .with_body(
+                r#"{
+                "namespace": [ "ns1"],
+                "properties" : {
+                    "key1": "value1"
+                }
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        let namespaces = catalog
+            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
+            .await
+            .unwrap();
+
+        let expected_ns = Namespace::with_properties(
+            NamespaceIdent::new("ns1".to_string()),
+            HashMap::from([("key1".to_string(), "value1".to_string())]),
+        );
+
+        assert_eq!(expected_ns, namespaces);
+
+        config_mock.assert_async().await;
+        get_ns_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn check_namespace_exists() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let get_ns_mock = server
+            .mock("HEAD", "/v1/namespaces/ns1")
+            .with_status(204)
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        assert!(catalog
+            .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
+            .await
+            .unwrap());
+
+        config_mock.assert_async().await;
+        get_ns_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_drop_namespace() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let drop_ns_mock = server
+            .mock("DELETE", "/v1/namespaces/ns1")
+            .with_status(204)
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        catalog
+            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
+            .await
+            .unwrap();
+
+        config_mock.assert_async().await;
+        drop_ns_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_list_tables() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let list_tables_mock = server
+            .mock("GET", "/v1/namespaces/ns1/tables")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table1"
+                    },
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table2"
+                    }
+                ]
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        let tables = catalog
+            .list_tables(&NamespaceIdent::new("ns1".to_string()))
+            .await
+            .unwrap();
+
+        let expected_tables = vec![
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table1".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table2".to_string()),
+        ];
+
+        assert_eq!(tables, expected_tables);
+
+        config_mock.assert_async().await;
+        list_tables_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_drop_tables() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let delete_table_mock = server
+            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
+            .with_status(204)
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        catalog
+            .drop_table(&TableIdent::new(
+                NamespaceIdent::new("ns1".to_string()),
+                "table1".to_string(),
+            ))
+            .await
+            .unwrap();
+
+        config_mock.assert_async().await;
+        delete_table_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_check_table_exists() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let check_table_exists_mock = server
+            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
+            .with_status(204)
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        assert!(catalog
+            .stat_table(&TableIdent::new(
+                NamespaceIdent::new("ns1".to_string()),
+                "table1".to_string(),
+            ))
+            .await
+            .unwrap());
+
+        config_mock.assert_async().await;
+        check_table_exists_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_rename_table() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let rename_table_mock = server
+            .mock("POST", "/v1/tables/rename")
+            .with_status(204)
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        catalog
+            .rename_table(
+                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table1".to_string()),
+                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table2".to_string()),
+            )
+            .await
+            .unwrap();
+
+        config_mock.assert_async().await;
+        rename_table_mock.assert_async().await;
+    }
+}
diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs
new file mode 100644
index 0000000..023fe7a
--- /dev/null
+++ b/crates/catalog/rest/src/lib.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg REST API implementation.
+
+#![deny(missing_docs)]
+
+mod catalog;
+pub use catalog::*;
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 9f2d4a8..c7caa24 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -41,10 +41,12 @@ either = "1"
 futures = "0.3"
 itertools = "0.11"
 lazy_static = "1"
+log = "^0.4"
 murmur3 = "0.5.2"
 once_cell = "1"
 opendal = "0.41"
 ordered-float = "4.0.0"
+reqwest = { version = "^0.11", features = ["json"] }
 rust_decimal = "1.31.0"
 serde = { version = "^1.0", features = ["rc"] }
 serde_bytes = "0.11.8"
@@ -52,6 +54,7 @@ serde_derive = "^1.0"
 serde_json = "^1.0"
 serde_repr = "0.1.16"
 url = "2"
+urlencoding = "2"
 uuid = "1.4.1"
 
 [dev-dependencies]
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index f50d78c..d58eaa2 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -17,11 +17,15 @@
 
 //! Catalog API for Apache Iceberg
 
+use serde_derive::{Deserialize, Serialize};
+use urlencoding::encode;
+
 use crate::spec::{PartitionSpec, Schema, SortOrder};
 use crate::table::Table;
-use crate::Result;
+use crate::{Error, ErrorKind, Result};
 use async_trait::async_trait;
 use std::collections::HashMap;
+use std::ops::Deref;
 
 /// The catalog API for Iceberg Rust.
 #[async_trait]
@@ -40,6 +44,9 @@ pub trait Catalog {
     /// Get a namespace information from the catalog.
     async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace>;
 
+    /// Check if namespace exists in catalog.
+    async fn namespace_exists(&self, namesace: &NamespaceIdent) -> 
Result<bool>;
+
     /// Update a namespace inside the catalog.
     ///
     /// # Behavior
@@ -88,6 +95,8 @@ pub trait Catalog {
 /// The namespace identifier is a list of strings, where each string is a
 /// component of the namespace. It's catalog implementer's responsibility to
 /// handle the namespace identifier correctly.
+
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
 pub struct NamespaceIdent(Vec<String>);
 
 impl NamespaceIdent {
@@ -97,8 +106,24 @@ impl NamespaceIdent {
     }
 
     /// Create a multi-level namespace identifier from vector.
-    pub fn from_vec(names: Vec<String>) -> Self {
-        Self(names)
+    pub fn from_vec(names: Vec<String>) -> Result<Self> {
+        if names.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Namespace identifier can't be empty!",
+            ));
+        }
+        Ok(Self(names))
+    }
+
+    /// Returns url encoded format.
+    pub fn encode_in_url(&self) -> String {
+        encode(&self.as_ref().join("\u{1F}")).to_string()
+    }
+
+    /// Returns inner strings.
+    pub fn inner(self) -> Vec<String> {
+        self.0
     }
 }
 
@@ -108,7 +133,16 @@ impl AsRef<Vec<String>> for NamespaceIdent {
     }
 }
 
+impl Deref for NamespaceIdent {
+    type Target = [String];
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
 /// Namespace represents a namespace in the catalog.
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Namespace {
     name: NamespaceIdent,
     properties: HashMap<String, String>,
@@ -137,9 +171,12 @@ impl Namespace {
 }
 
 /// TableIdent represents the identifier of a table in the catalog.
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
 pub struct TableIdent {
-    namespace: NamespaceIdent,
-    name: String,
+    /// Namespace of the table.
+    pub namespace: NamespaceIdent,
+    /// Table name.
+    pub name: String,
 }
 
 impl TableIdent {
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index e4ae576..8948c7d 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -307,6 +307,18 @@ define_from_err!(
     "Failed to parse url"
 );
 
+define_from_err!(
+    reqwest::Error,
+    ErrorKind::Unexpected,
+    "Failed to execute http request"
+);
+
+define_from_err!(
+    serde_json::Error,
+    ErrorKind::DataInvalid,
+    "Failed to parse json string"
+);
+
 /// Helper macro to check arguments.
 ///
 ///
diff --git a/crates/iceberg/src/rest.rs b/crates/iceberg/src/rest.rs
new file mode 100644
index 0000000..f20e961
--- /dev/null
+++ b/crates/iceberg/src/rest.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provide rest catalog implementation.
+
+pub struct RestCatalog {
+    url: String,
+}


Reply via email to