This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c3b5364  refactor(catalogs/rest): Split user config and runtime config 
(#431)
c3b5364 is described below

commit c3b5364ef0789632e77602cec1bbf6f9c8a17739
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 4 12:50:50 2024 +0800

    refactor(catalogs/rest): Split user config and runtime config (#431)
    
    * refactor(catalogs/rest): Split user config and runtime config
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Sort cargo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix unit tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove default feature of tokio
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * return error here
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Return error if cred doesn't exist
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 Cargo.toml                                     |  12 +-
 crates/catalog/rest/Cargo.toml                 |   2 +
 crates/catalog/rest/src/catalog.rs             | 762 +++++++++++--------------
 crates/catalog/rest/src/client.rs              | 178 +++++-
 crates/catalog/rest/src/lib.rs                 |   2 +
 crates/catalog/rest/src/types.rs               | 189 ++++++
 crates/catalog/rest/tests/rest_catalog_test.rs |   2 +-
 crates/examples/src/rest_catalog_namespace.rs  |   2 +-
 crates/examples/src/rest_catalog_table.rs      |   2 +-
 9 files changed, 688 insertions(+), 463 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 0c2bf9c..106aa8a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,11 +18,11 @@
 [workspace]
 resolver = "2"
 members = [
-  "crates/catalog/*",
-  "crates/examples",
-  "crates/iceberg",
-  "crates/integrations/*",
-  "crates/test_utils",
+    "crates/catalog/*",
+    "crates/examples",
+    "crates/iceberg",
+    "crates/integrations/*",
+    "crates/test_utils",
 ]
 
 [workspace.package]
@@ -81,7 +81,7 @@ serde_json = "^1.0"
 serde_repr = "0.1.16"
 serde_with = "3.4.0"
 tempfile = "3.8"
-tokio = { version = "1", features = ["macros"] }
+tokio = { version = "1", default-features = false }
 typed-builder = "^0.18"
 url = "2"
 urlencoding = "2"
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 7abe9c8..6492340 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -32,6 +32,7 @@ keywords = ["iceberg", "rest", "catalog"]
 # async-trait = { workspace = true }
 async-trait = { workspace = true }
 chrono = { workspace = true }
+http = "1.1.0"
 iceberg = { workspace = true }
 itertools = { workspace = true }
 log = "0.4.20"
@@ -39,6 +40,7 @@ reqwest = { workspace = true }
 serde = { workspace = true }
 serde_derive = { workspace = true }
 serde_json = { workspace = true }
+tokio = { workspace = true, features = ["sync"] }
 typed-builder = { workspace = true }
 urlencoding = { workspace = true }
 uuid = { workspace = true, features = ["v4"] }
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index 2e5b1b1..a43862a 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -24,13 +24,16 @@ use async_trait::async_trait;
 use itertools::Itertools;
 use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
 use reqwest::{Method, StatusCode, Url};
+use tokio::sync::OnceCell;
 use typed_builder::TypedBuilder;
 use urlencoding::encode;
 
-use crate::catalog::_serde::{
-    CommitTableRequest, CommitTableResponse, CreateTableRequest, 
LoadTableResponse,
-};
 use crate::client::HttpClient;
+use crate::types::{
+    CatalogConfig, CommitTableRequest, CommitTableResponse, 
CreateTableRequest, ErrorResponse,
+    ListNamespaceResponse, ListTableResponse, LoadTableResponse, 
NamespaceSerde,
+    RenameTableRequest, NO_CONTENT, OK,
+};
 use iceberg::io::FileIO;
 use iceberg::table::Table;
 use iceberg::Result;
@@ -38,17 +41,12 @@ use iceberg::{
     Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, 
TableCreation, TableIdent,
 };
 
-use self::_serde::{
-    CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, 
NamespaceSerde,
-    RenameTableRequest, TokenResponse, NO_CONTENT, OK,
-};
-
 const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
 const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
 const PATH_V1: &str = "v1";
 
 /// Rest catalog configuration.
-#[derive(Debug, TypedBuilder)]
+#[derive(Clone, Debug, TypedBuilder)]
 pub struct RestCatalogConfig {
     uri: String,
     #[builder(default, setter(strip_option))]
@@ -71,7 +69,7 @@ impl RestCatalogConfig {
         [&self.uri, PATH_V1, "config"].join("/")
     }
 
-    fn get_token_endpoint(&self) -> String {
+    pub(crate) fn get_token_endpoint(&self) -> String {
         if let Some(auth_url) = self.props.get("rest.authorization-url") {
             auth_url.to_string()
         } else {
@@ -104,7 +102,41 @@ impl RestCatalogConfig {
         ])
     }
 
-    fn http_headers(&self) -> Result<HeaderMap> {
+    /// Get the token from the config.
+    ///
+    /// Client will use `token` to send requests if exists.
+    pub(crate) fn token(&self) -> Option<String> {
+        self.props.get("token").cloned()
+    }
+
+    /// Get the credentials from the config. Client will use `credential`
+    /// to fetch a new token if exists.
+    ///
+    /// ## Output
+    ///
+    /// - `None`: No credential is set.
+    /// - `Some(None, client_secret)`: No client_id is set, use client_secret 
directly.
+    /// - `Some(Some(client_id), client_secret)`: Both client_id and 
client_secret are set.
+    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
+        let cred = self.props.get("credential")?;
+
+        match cred.split_once(':') {
+            Some((client_id, client_secret)) => {
+                Some((Some(client_id.to_string()), client_secret.to_string()))
+            }
+            None => Some((None, cred.to_string())),
+        }
+    }
+
+    /// Get the extra headers from config.
+    ///
+    /// We will include:
+    ///
+    /// - `content-type`
+    /// - `x-client-version`
+    /// - `user-agnet`
+    /// - all headers specified by `header.xxx` in props.
+    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
         let mut headers = HeaderMap::from_iter([
             (
                 header::CONTENT_TYPE,
@@ -120,73 +152,160 @@ impl RestCatalogConfig {
             ),
         ]);
 
-        if let Some(token) = self.props.get("token") {
+        for (key, value) in self
+            .props
+            .iter()
+            .filter(|(k, _)| k.starts_with("header."))
+            // The unwrap here is same since we are filtering the keys
+            .map(|(k, v)| (k.strip_prefix("header.").unwrap(), v))
+        {
             headers.insert(
-                header::AUTHORIZATION,
-                HeaderValue::from_str(&format!("Bearer {token}")).map_err(|e| {
+                HeaderName::from_str(key).map_err(|e| {
                     Error::new(
                         ErrorKind::DataInvalid,
-                        "Invalid token received from catalog server!",
+                        format!("Invalid header name: {key}"),
+                    )
+                    .with_source(e)
+                })?,
+                HeaderValue::from_str(value).map_err(|e| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid header value: {value}"),
                     )
                     .with_source(e)
                 })?,
             );
         }
 
-        for (key, value) in self.props.iter() {
-            if let Some(stripped_key) = key.strip_prefix("header.") {
-                // Avoid overwriting default headers
-                if !headers.contains_key(stripped_key) {
-                    headers.insert(
-                        HeaderName::from_str(stripped_key).map_err(|e| {
-                            Error::new(
-                                ErrorKind::DataInvalid,
-                                format!("Invalid header name: 
{stripped_key}!"),
-                            )
-                            .with_source(e)
-                        })?,
-                        HeaderValue::from_str(value).map_err(|e| {
-                            Error::new(
-                                ErrorKind::DataInvalid,
-                                format!("Invalid header value: {value}!"),
-                            )
-                            .with_source(e)
-                        })?,
-                    );
-                }
-            }
-        }
         Ok(headers)
     }
 
-    fn try_create_rest_client(&self) -> Result<HttpClient> {
-        // TODO: We will add ssl config, sigv4 later
-        let headers = self.http_headers()?;
-        HttpClient::try_create(headers)
-    }
+    /// Get the optional oauth headers from the config.
+    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
+        let mut params = HashMap::new();
 
-    fn optional_oauth_params(&self) -> HashMap<&str, &str> {
-        let mut optional_oauth_param = HashMap::new();
         if let Some(scope) = self.props.get("scope") {
-            optional_oauth_param.insert("scope", scope.as_str());
+            params.insert("scope".to_string(), scope.to_string());
         } else {
-            optional_oauth_param.insert("scope", "catalog");
+            params.insert("scope".to_string(), "catalog".to_string());
         }
-        let set_of_optional_params = ["audience", "resource"];
-        for param_name in set_of_optional_params.iter() {
-            if let Some(value) = self.props.get(*param_name) {
-                optional_oauth_param.insert(param_name.to_owned(), value);
+
+        let optional_params = ["audience", "resource"];
+        for param_name in optional_params {
+            if let Some(value) = self.props.get(param_name) {
+                params.insert(param_name.to_string(), value.to_string());
             }
         }
-        optional_oauth_param
+        params
+    }
+
+    /// Merge the config with the given config fetched from rest server.
+    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> 
Self {
+        if let Some(uri) = config.overrides.remove("uri") {
+            self.uri = uri;
+        }
+
+        let mut props = config.defaults;
+        props.extend(self.props);
+        props.extend(config.overrides);
+
+        self.props = props;
+        self
     }
 }
 
+#[derive(Debug)]
+struct RestContext {
+    client: HttpClient,
+
+    /// Runtime config is fetched from rest server and stored here.
+    ///
+    /// It's could be different from the user config.
+    config: RestCatalogConfig,
+}
+
+impl RestContext {}
+
 /// Rest catalog implementation.
 #[derive(Debug)]
 pub struct RestCatalog {
-    config: RestCatalogConfig,
-    client: HttpClient,
+    /// User config is stored as-is and never be changed.
+    ///
+    /// It's could be different from the config fetched from the server and 
used at runtime.
+    user_config: RestCatalogConfig,
+    ctx: OnceCell<RestContext>,
+}
+
+impl RestCatalog {
+    /// Creates a rest catalog from config.
+    pub fn new(config: RestCatalogConfig) -> Self {
+        Self {
+            user_config: config,
+            ctx: OnceCell::new(),
+        }
+    }
+
+    /// Get the context from the catalog.
+    async fn context(&self) -> Result<&RestContext> {
+        self.ctx
+            .get_or_try_init(|| async {
+                let catalog_config = 
RestCatalog::load_config(&self.user_config).await?;
+                let config = 
self.user_config.clone().merge_with_config(catalog_config);
+                let client = HttpClient::new(&config)?;
+
+                Ok(RestContext { config, client })
+            })
+            .await
+    }
+
+    /// Load the runtime config from the server by user_config.
+    ///
+    /// It's required for a rest catalog to update it's config after creation.
+    async fn load_config(user_config: &RestCatalogConfig) -> 
Result<CatalogConfig> {
+        let client = HttpClient::new(user_config)?;
+
+        let mut request = client.request(Method::GET, 
user_config.config_endpoint());
+
+        if let Some(warehouse_location) = &user_config.warehouse {
+            request = request.query(&[("warehouse", warehouse_location)]);
+        }
+
+        let config = client
+            .query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
+            .await?;
+        Ok(config)
+    }
+
+    async fn load_file_io(
+        &self,
+        metadata_location: Option<&str>,
+        extra_config: Option<HashMap<String, String>>,
+    ) -> Result<FileIO> {
+        let mut props = self.context().await?.config.props.clone();
+        if let Some(config) = extra_config {
+            props.extend(config);
+        }
+
+        // If the warehouse is a logical identifier instead of a URL we don't 
want
+        // to raise an exception
+        let warehouse_path = match 
self.context().await?.config.warehouse.as_deref() {
+            Some(url) if Url::parse(url).is_ok() => Some(url),
+            Some(_) => None,
+            None => None,
+        };
+
+        let file_io = match warehouse_path.or(metadata_location) {
+            Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
+            None => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Unable to load file io, neither warehouse nor metadata 
location is set!",
+                ))?
+            }
+        };
+
+        Ok(file_io)
+    }
 }
 
 #[async_trait]
@@ -196,14 +315,17 @@ impl Catalog for RestCatalog {
         &self,
         parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        let mut request = self
-            .client
-            .request(Method::GET, self.config.namespaces_endpoint());
+        let mut request = self.context().await?.client.request(
+            Method::GET,
+            self.context().await?.config.namespaces_endpoint(),
+        );
         if let Some(ns) = parent {
             request = request.query(&[("parent", ns.encode_in_url())]);
         }
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<ListNamespaceResponse, ErrorResponse, 
OK>(request.build()?)
             .await?;
@@ -221,8 +343,13 @@ impl Catalog for RestCatalog {
         properties: HashMap<String, String>,
     ) -> Result<Namespace> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::POST, self.config.namespaces_endpoint())
+            .request(
+                Method::POST,
+                self.context().await?.config.namespaces_endpoint(),
+            )
             .json(&NamespaceSerde {
                 namespace: namespace.as_ref().clone(),
                 properties: Some(properties),
@@ -230,6 +357,8 @@ impl Catalog for RestCatalog {
             .build()?;
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<NamespaceSerde, ErrorResponse, OK>(request)
             .await?;
@@ -240,11 +369,18 @@ impl Catalog for RestCatalog {
     /// Get a namespace information from the catalog.
     async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::GET, self.config.namespace_endpoint(namespace))
+            .request(
+                Method::GET,
+                self.context().await?.config.namespace_endpoint(namespace),
+            )
             .build()?;
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<NamespaceSerde, ErrorResponse, OK>(request)
             .await?;
@@ -269,11 +405,18 @@ impl Catalog for RestCatalog {
 
     async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::HEAD, self.config.namespace_endpoint(ns))
+            .request(
+                Method::HEAD,
+                self.context().await?.config.namespace_endpoint(ns),
+            )
             .build()?;
 
-        self.client
+        self.context()
+            .await?
+            .client
             .do_execute::<bool, ErrorResponse>(request, |resp| match 
resp.status() {
                 StatusCode::NO_CONTENT => Some(true),
                 StatusCode::NOT_FOUND => Some(false),
@@ -285,11 +428,18 @@ impl Catalog for RestCatalog {
     /// Drop a namespace from the catalog.
     async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::DELETE, self.config.namespace_endpoint(namespace))
+            .request(
+                Method::DELETE,
+                self.context().await?.config.namespace_endpoint(namespace),
+            )
             .build()?;
 
-        self.client
+        self.context()
+            .await?
+            .client
             .execute::<ErrorResponse, NO_CONTENT>(request)
             .await
     }
@@ -297,11 +447,18 @@ impl Catalog for RestCatalog {
     /// List tables from namespace.
     async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::GET, self.config.tables_endpoint(namespace))
+            .request(
+                Method::GET,
+                self.context().await?.config.tables_endpoint(namespace),
+            )
             .build()?;
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<ListTableResponse, ErrorResponse, OK>(request)
             .await?;
@@ -318,8 +475,13 @@ impl Catalog for RestCatalog {
         let table_ident = TableIdent::new(namespace.clone(), 
creation.name.clone());
 
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::POST, self.config.tables_endpoint(namespace))
+            .request(
+                Method::POST,
+                self.context().await?.config.tables_endpoint(namespace),
+            )
             .json(&CreateTableRequest {
                 name: creation.name,
                 location: creation.location,
@@ -337,11 +499,15 @@ impl Catalog for RestCatalog {
             .build()?;
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<LoadTableResponse, ErrorResponse, OK>(request)
             .await?;
 
-        let file_io = self.load_file_io(resp.metadata_location.as_deref(), 
resp.config)?;
+        let file_io = self
+            .load_file_io(resp.metadata_location.as_deref(), resp.config)
+            .await?;
 
         let table = Table::builder()
             .identifier(table_ident)
@@ -361,16 +527,25 @@ impl Catalog for RestCatalog {
     /// Load table from the catalog.
     async fn load_table(&self, table: &TableIdent) -> Result<Table> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::GET, self.config.table_endpoint(table))
+            .request(
+                Method::GET,
+                self.context().await?.config.table_endpoint(table),
+            )
             .build()?;
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<LoadTableResponse, ErrorResponse, OK>(request)
             .await?;
 
-        let file_io = self.load_file_io(resp.metadata_location.as_deref(), 
resp.config)?;
+        let file_io = self
+            .load_file_io(resp.metadata_location.as_deref(), resp.config)
+            .await?;
 
         let table_builder = Table::builder()
             .identifier(table.clone())
@@ -387,11 +562,18 @@ impl Catalog for RestCatalog {
     /// Drop a table from the catalog.
     async fn drop_table(&self, table: &TableIdent) -> Result<()> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::DELETE, self.config.table_endpoint(table))
+            .request(
+                Method::DELETE,
+                self.context().await?.config.table_endpoint(table),
+            )
             .build()?;
 
-        self.client
+        self.context()
+            .await?
+            .client
             .execute::<ErrorResponse, NO_CONTENT>(request)
             .await
     }
@@ -399,11 +581,18 @@ impl Catalog for RestCatalog {
     /// Check if a table exists in the catalog.
     async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::HEAD, self.config.table_endpoint(table))
+            .request(
+                Method::HEAD,
+                self.context().await?.config.table_endpoint(table),
+            )
             .build()?;
 
-        self.client
+        self.context()
+            .await?
+            .client
             .do_execute::<bool, ErrorResponse>(request, |resp| match 
resp.status() {
                 StatusCode::NO_CONTENT => Some(true),
                 StatusCode::NOT_FOUND => Some(false),
@@ -415,15 +604,22 @@ impl Catalog for RestCatalog {
     /// Rename a table in the catalog.
     async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> 
Result<()> {
         let request = self
+            .context()
+            .await?
             .client
-            .request(Method::POST, self.config.rename_table_endpoint())
+            .request(
+                Method::POST,
+                self.context().await?.config.rename_table_endpoint(),
+            )
             .json(&RenameTableRequest {
                 source: src.clone(),
                 destination: dest.clone(),
             })
             .build()?;
 
-        self.client
+        self.context()
+            .await?
+            .client
             .execute::<ErrorResponse, NO_CONTENT>(request)
             .await
     }
@@ -431,10 +627,15 @@ impl Catalog for RestCatalog {
     /// Update table.
     async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
         let request = self
+            .context()
+            .await?
             .client
             .request(
                 Method::POST,
-                self.config.table_endpoint(commit.identifier()),
+                self.context()
+                    .await?
+                    .config
+                    .table_endpoint(commit.identifier()),
             )
             .json(&CommitTableRequest {
                 identifier: commit.identifier().clone(),
@@ -444,11 +645,15 @@ impl Catalog for RestCatalog {
             .build()?;
 
         let resp = self
+            .context()
+            .await?
             .client
             .query::<CommitTableResponse, ErrorResponse, OK>(request)
             .await?;
 
-        let file_io = self.load_file_io(Some(&resp.metadata_location), None)?;
+        let file_io = self
+            .load_file_io(Some(&resp.metadata_location), None)
+            .await?;
         Ok(Table::builder()
             .identifier(commit.identifier().clone())
             .file_io(file_io)
@@ -458,295 +663,6 @@ impl Catalog for RestCatalog {
     }
 }
 
-impl RestCatalog {
-    /// Creates a rest catalog from config.
-    pub async fn new(config: RestCatalogConfig) -> Result<Self> {
-        let mut catalog = Self {
-            client: config.try_create_rest_client()?,
-            config,
-        };
-        catalog.fetch_access_token().await?;
-        catalog.client = catalog.config.try_create_rest_client()?;
-        catalog.update_config().await?;
-        catalog.client = catalog.config.try_create_rest_client()?;
-
-        Ok(catalog)
-    }
-
-    async fn fetch_access_token(&mut self) -> Result<()> {
-        if self.config.props.contains_key("token") {
-            return Ok(());
-        }
-        if let Some(credential) = self.config.props.get("credential") {
-            let (client_id, client_secret) = if credential.contains(':') {
-                let (client_id, client_secret) = 
credential.split_once(':').unwrap();
-                (Some(client_id), client_secret)
-            } else {
-                (None, credential.as_str())
-            };
-            let mut params = HashMap::with_capacity(4);
-            params.insert("grant_type", "client_credentials");
-            if let Some(client_id) = client_id {
-                params.insert("client_id", client_id);
-            }
-            params.insert("client_secret", client_secret);
-            let optional_oauth_params = self.config.optional_oauth_params();
-            params.extend(optional_oauth_params);
-            let req = self
-                .client
-                .request(Method::POST, self.config.get_token_endpoint())
-                .form(&params)
-                .build()?;
-            let res = self
-                .client
-                .query::<TokenResponse, ErrorResponse, OK>(req)
-                .await
-                .map_err(|e| {
-                    Error::new(
-                        ErrorKind::Unexpected,
-                        "Failed to fetch access token from catalog server!",
-                    )
-                    .with_source(e)
-                })?;
-            let token = res.access_token;
-            self.config.props.insert("token".to_string(), token);
-        }
-
-        Ok(())
-    }
-
-    async fn update_config(&mut self) -> Result<()> {
-        let mut request = self
-            .client
-            .request(Method::GET, self.config.config_endpoint());
-
-        if let Some(warehouse_location) = &self.config.warehouse {
-            request = request.query(&[("warehouse", warehouse_location)]);
-        }
-
-        let mut config = self
-            .client
-            .query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
-            .await?;
-
-        let mut props = config.defaults;
-        props.extend(self.config.props.clone());
-        if let Some(uri) = config.overrides.remove("uri") {
-            self.config.uri = uri;
-        }
-        props.extend(config.overrides);
-
-        self.config.props = props;
-
-        Ok(())
-    }
-
-    fn load_file_io(
-        &self,
-        metadata_location: Option<&str>,
-        extra_config: Option<HashMap<String, String>>,
-    ) -> Result<FileIO> {
-        let mut props = self.config.props.clone();
-        if let Some(config) = extra_config {
-            props.extend(config);
-        }
-
-        // If the warehouse is a logical identifier instead of a URL we don't 
want
-        // to raise an exception
-        let warehouse_path = match self.config.warehouse.as_deref() {
-            Some(url) if Url::parse(url).is_ok() => Some(url),
-            Some(_) => None,
-            None => None,
-        };
-
-        let file_io = match warehouse_path.or(metadata_location) {
-            Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
-            None => {
-                return Err(Error::new(
-                    ErrorKind::Unexpected,
-                    "Unable to load file io, neither warehouse nor metadata 
location is set!",
-                ))?
-            }
-        };
-
-        Ok(file_io)
-    }
-}
-
-/// Requests and responses for rest api.
-mod _serde {
-    use std::collections::HashMap;
-
-    use serde_derive::{Deserialize, Serialize};
-
-    use iceberg::spec::{Schema, SortOrder, TableMetadata, 
UnboundPartitionSpec};
-    use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, 
TableUpdate};
-
-    pub(super) const OK: u16 = 200u16;
-    pub(super) const NO_CONTENT: u16 = 204u16;
-
-    #[derive(Clone, Debug, Serialize, Deserialize)]
-    pub(super) struct CatalogConfig {
-        pub(super) overrides: HashMap<String, String>,
-        pub(super) defaults: HashMap<String, String>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct ErrorResponse {
-        error: ErrorModel,
-    }
-
-    impl From<ErrorResponse> for Error {
-        fn from(resp: ErrorResponse) -> Error {
-            resp.error.into()
-        }
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct ErrorModel {
-        pub(super) message: String,
-        pub(super) r#type: String,
-        pub(super) code: u16,
-        pub(super) stack: Option<Vec<String>>,
-    }
-
-    impl From<ErrorModel> for Error {
-        fn from(value: ErrorModel) -> Self {
-            let mut error = Error::new(ErrorKind::DataInvalid, value.message)
-                .with_context("type", value.r#type)
-                .with_context("code", format!("{}", value.code));
-
-            if let Some(stack) = value.stack {
-                error = error.with_context("stack", stack.join("\n"));
-            }
-
-            error
-        }
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct OAuthError {
-        pub(super) error: String,
-        pub(super) error_description: Option<String>,
-        pub(super) error_uri: Option<String>,
-    }
-
-    impl From<OAuthError> for Error {
-        fn from(value: OAuthError) -> Self {
-            let mut error = Error::new(
-                ErrorKind::DataInvalid,
-                format!("OAuthError: {}", value.error),
-            );
-
-            if let Some(desc) = value.error_description {
-                error = error.with_context("description", desc);
-            }
-
-            if let Some(uri) = value.error_uri {
-                error = error.with_context("uri", uri);
-            }
-
-            error
-        }
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct TokenResponse {
-        pub(super) access_token: String,
-        pub(super) token_type: String,
-        pub(super) expires_in: Option<u64>,
-        pub(super) issued_token_type: Option<String>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct NamespaceSerde {
-        pub(super) namespace: Vec<String>,
-        pub(super) properties: Option<HashMap<String, String>>,
-    }
-
-    impl TryFrom<NamespaceSerde> for super::Namespace {
-        type Error = Error;
-        fn try_from(value: NamespaceSerde) -> std::result::Result<Self, 
Self::Error> {
-            Ok(super::Namespace::with_properties(
-                super::NamespaceIdent::from_vec(value.namespace)?,
-                value.properties.unwrap_or_default(),
-            ))
-        }
-    }
-
-    impl From<&Namespace> for NamespaceSerde {
-        fn from(value: &Namespace) -> Self {
-            Self {
-                namespace: value.name().as_ref().clone(),
-                properties: Some(value.properties().clone()),
-            }
-        }
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct ListNamespaceResponse {
-        pub(super) namespaces: Vec<Vec<String>>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct UpdateNamespacePropsRequest {
-        removals: Option<Vec<String>>,
-        updates: Option<HashMap<String, String>>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct UpdateNamespacePropsResponse {
-        updated: Vec<String>,
-        removed: Vec<String>,
-        missing: Option<Vec<String>>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct ListTableResponse {
-        pub(super) identifiers: Vec<TableIdent>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct RenameTableRequest {
-        pub(super) source: TableIdent,
-        pub(super) destination: TableIdent,
-    }
-
-    #[derive(Debug, Deserialize)]
-    #[serde(rename_all = "kebab-case")]
-    pub(super) struct LoadTableResponse {
-        pub(super) metadata_location: Option<String>,
-        pub(super) metadata: TableMetadata,
-        pub(super) config: Option<HashMap<String, String>>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    #[serde(rename_all = "kebab-case")]
-    pub(super) struct CreateTableRequest {
-        pub(super) name: String,
-        pub(super) location: Option<String>,
-        pub(super) schema: Schema,
-        pub(super) partition_spec: Option<UnboundPartitionSpec>,
-        pub(super) write_order: Option<SortOrder>,
-        pub(super) stage_create: Option<bool>,
-        pub(super) properties: Option<HashMap<String, String>>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    pub(super) struct CommitTableRequest {
-        pub(super) identifier: TableIdent,
-        pub(super) requirements: Vec<TableRequirement>,
-        pub(super) updates: Vec<TableUpdate>,
-    }
-
-    #[derive(Debug, Serialize, Deserialize)]
-    #[serde(rename_all = "kebab-case")]
-    pub(super) struct CommitTableResponse {
-        pub(super) metadata_location: String,
-        pub(super) metadata: TableMetadata,
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use chrono::{TimeZone, Utc};
@@ -783,12 +699,16 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         assert_eq!(
-            catalog.config.props.get("warehouse"),
+            catalog
+                .context()
+                .await
+                .unwrap()
+                .config
+                .props
+                .get("warehouse"),
             Some(&"s3://iceberg-catalog".to_string())
         );
 
@@ -827,6 +747,7 @@ mod tests {
                 "expires_in": 86400
                 }"#,
             )
+            .expect(2)
             .create_async()
             .await
     }
@@ -845,16 +766,12 @@ mod tests {
                 .uri(server.url())
                 .props(props)
                 .build(),
-        )
-        .await
-        .unwrap();
+        );
 
+        let token = catalog.context().await.unwrap().client.token().await;
         oauth_mock.assert_async().await;
         config_mock.assert_async().await;
-        assert_eq!(
-            catalog.config.props.get("token"),
-            Some(&"ey000000000000".to_string())
-        );
+        assert_eq!(token, Some("ey000000000000".to_string()));
     }
 
     #[tokio::test]
@@ -884,6 +801,7 @@ mod tests {
                 "expires_in": 86400
                 }"#,
             )
+            .expect(2)
             .create_async()
             .await;
 
@@ -894,16 +812,13 @@ mod tests {
                 .uri(server.url())
                 .props(props)
                 .build(),
-        )
-        .await
-        .unwrap();
+        );
+
+        let token = catalog.context().await.unwrap().client.token().await;
 
         oauth_mock.assert_async().await;
         config_mock.assert_async().await;
-        assert_eq!(
-            catalog.config.props.get("token"),
-            Some(&"ey000000000000".to_string())
-        );
+        assert_eq!(token, Some("ey000000000000".to_string()));
     }
 
     #[tokio::test]
@@ -916,7 +831,7 @@ mod tests {
             .uri(server.url())
             .props(props)
             .build();
-        let headers: HeaderMap = config.http_headers().unwrap();
+        let headers: HeaderMap = config.extra_headers().unwrap();
 
         let expected_headers = HeaderMap::from_iter([
             (
@@ -953,12 +868,12 @@ mod tests {
             .uri(server.url())
             .props(props)
             .build();
-        let headers: HeaderMap = config.http_headers().unwrap();
+        let headers: HeaderMap = config.extra_headers().unwrap();
 
         let expected_headers = HeaderMap::from_iter([
             (
                 header::CONTENT_TYPE,
-                HeaderValue::from_static("application/json"),
+                HeaderValue::from_static("application/yaml"),
             ),
             (
                 HeaderName::from_static("x-client-version"),
@@ -997,16 +912,13 @@ mod tests {
                 .uri(server.url())
                 .props(props)
                 .build(),
-        )
-        .await
-        .unwrap();
+        );
+
+        let token = catalog.context().await.unwrap().client.token().await;
 
         oauth_mock.assert_async().await;
         config_mock.assert_async().await;
-        assert_eq!(
-            catalog.config.props.get("token"),
-            Some(&"ey000000000000".to_string())
-        );
+        assert_eq!(token, Some("ey000000000000".to_string()));
     }
 
     #[tokio::test]
@@ -1044,9 +956,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let _namespaces = catalog.list_namespaces(None).await.unwrap();
 
@@ -1073,9 +983,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let namespaces = catalog.list_namespaces(None).await.unwrap();
 
@@ -1109,9 +1017,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let namespaces = catalog
             .create_namespace(
@@ -1151,9 +1057,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let namespaces = catalog
             .get_namespace(&NamespaceIdent::new("ns1".to_string()))
@@ -1183,9 +1087,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         assert!(catalog
             .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
@@ -1208,9 +1110,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         catalog
             .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
@@ -1247,9 +1147,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let tables = catalog
             .list_tables(&NamespaceIdent::new("ns1".to_string()))
@@ -1279,9 +1177,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         catalog
             .drop_table(&TableIdent::new(
@@ -1307,9 +1203,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         assert!(catalog
             .table_exists(&TableIdent::new(
@@ -1335,9 +1229,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         catalog
             .rename_table(
@@ -1368,9 +1260,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let table = catalog
             .load_table(&TableIdent::new(
@@ -1481,9 +1371,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let table = catalog
             .load_table(&TableIdent::new(
@@ -1520,9 +1408,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let table_creation = TableCreation::builder()
             .name("test1".to_string())
@@ -1662,9 +1548,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let table_creation = TableCreation::builder()
             .name("test1".to_string())
@@ -1717,9 +1601,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let table1 = {
             let file = File::open(format!(
@@ -1839,9 +1721,7 @@ mod tests {
             .create_async()
             .await;
 
-        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
-            .await
-            .unwrap();
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
 
         let table1 = {
             let file = File::open(format!(
diff --git a/crates/catalog/rest/src/client.rs 
b/crates/catalog/rest/src/client.rs
index dbfd9de..43e14c7 100644
--- a/crates/catalog/rest/src/client.rs
+++ b/crates/catalog/rest/src/client.rs
@@ -15,25 +15,171 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::types::{ErrorResponse, TokenResponse, OK};
+use crate::RestCatalogConfig;
 use iceberg::Result;
 use iceberg::{Error, ErrorKind};
 use reqwest::header::HeaderMap;
 use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
 use serde::de::DeserializeOwned;
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::Mutex;
 
-#[derive(Debug)]
-pub(crate) struct HttpClient(Client);
+pub(crate) struct HttpClient {
+    client: Client,
+
+    /// The token to be used for authentication.
+    ///
+    /// It's possible to fetch the token from the server while needed.
+    token: Mutex<Option<String>>,
+    /// The token endpoint to be used for authentication.
+    token_endpoint: String,
+    /// The credential to be used for authentication.
+    credential: Option<(Option<String>, String)>,
+    /// Extra headers to be added to each request.
+    extra_headers: HeaderMap,
+    /// Extra oauth parameters to be added to each authentication request.
+    extra_oauth_params: HashMap<String, String>,
+}
+
+impl Debug for HttpClient {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HttpClient")
+            .field("client", &self.client)
+            .field("extra_headers", &self.extra_headers)
+            .finish_non_exhaustive()
+    }
+}
 
 impl HttpClient {
-    pub fn try_create(default_headers: HeaderMap) -> Result<Self> {
-        Ok(HttpClient(
-            Client::builder().default_headers(default_headers).build()?,
-        ))
+    pub fn new(cfg: &RestCatalogConfig) -> Result<Self> {
+        Ok(HttpClient {
+            client: Client::new(),
+
+            token: Mutex::new(cfg.token()),
+            token_endpoint: cfg.get_token_endpoint(),
+            credential: cfg.credential(),
+            extra_headers: cfg.extra_headers()?,
+            extra_oauth_params: cfg.extra_oauth_params(),
+        })
+    }
+
+    /// This API is testing only to assert the token.
+    #[cfg(test)]
+    pub(crate) async fn token(&self) -> Option<String> {
+        let mut req = self
+            .request(Method::GET, &self.token_endpoint)
+            .build()
+            .unwrap();
+        self.authenticate(&mut req).await.ok();
+        self.token.lock().unwrap().clone()
+    }
+
+    /// Authenticate the request by filling token.
+    ///
+    /// - If neither token nor credential is provided, this method will do 
nothing.
+    /// - If only credential is provided, this method will try to fetch token 
from the server.
+    /// - If token is provided, this method will use the token directly.
+    ///
+    /// # TODO
+    ///
+    /// Support refreshing token while needed.
+    async fn authenticate(&self, req: &mut Request) -> Result<()> {
+        // Clone the token from lock without holding the lock for entire 
function.
+        let token = { self.token.lock().expect("lock poison").clone() };
+
+        if self.credential.is_none() && token.is_none() {
+            return Ok(());
+        }
+
+        // Use token if provided.
+        if let Some(token) = &token {
+            req.headers_mut().insert(
+                http::header::AUTHORIZATION,
+                format!("Bearer {token}").parse().map_err(|e| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Invalid token received from catalog server!",
+                    )
+                    .with_source(e)
+                })?,
+            );
+            return Ok(());
+        }
+
+        // Credential must exist here.
+        let (client_id, client_secret) = 
self.credential.as_ref().ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Credential must be provided for authentication",
+            )
+        })?;
+
+        let mut params = HashMap::with_capacity(4);
+        params.insert("grant_type", "client_credentials");
+        if let Some(client_id) = client_id {
+            params.insert("client_id", client_id);
+        }
+        params.insert("client_secret", client_secret);
+        params.extend(
+            self.extra_oauth_params
+                .iter()
+                .map(|(k, v)| (k.as_str(), v.as_str())),
+        );
+
+        let auth_req = self
+            .client
+            .request(Method::POST, &self.token_endpoint)
+            .form(&params)
+            .build()?;
+        let auth_resp = self.client.execute(auth_req).await?;
+
+        let auth_res: TokenResponse = if auth_resp.status().as_u16() == OK {
+            let text = auth_resp.bytes().await?;
+            Ok(serde_json::from_slice(&text).map_err(|e| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "Failed to parse response from rest catalog server!",
+                )
+                .with_context("json", String::from_utf8_lossy(&text))
+                .with_source(e)
+            })?)
+        } else {
+            let code = auth_resp.status();
+            let text = auth_resp.bytes().await?;
+            let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "Failed to parse response from rest catalog server!",
+                )
+                .with_context("json", String::from_utf8_lossy(&text))
+                .with_context("code", code.to_string())
+                .with_source(e)
+            })?;
+            Err(Error::from(e))
+        }?;
+        let token = auth_res.access_token;
+        // Update token.
+        *self.token.lock().expect("lock poison") = Some(token.clone());
+        // Insert token in request.
+        req.headers_mut().insert(
+            http::header::AUTHORIZATION,
+            format!("Bearer {token}").parse().map_err(|e| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Invalid token received from catalog server!",
+                )
+                .with_source(e)
+            })?,
+        );
+
+        Ok(())
     }
 
     #[inline]
     pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> 
RequestBuilder {
-        self.0.request(method, url)
+        self.client.request(method, url)
     }
 
     pub async fn query<
@@ -42,9 +188,11 @@ impl HttpClient {
         const SUCCESS_CODE: u16,
     >(
         &self,
-        request: Request,
+        mut request: Request,
     ) -> Result<R> {
-        let resp = self.0.execute(request).await?;
+        self.authenticate(&mut request).await?;
+
+        let resp = self.client.execute(request).await?;
 
         if resp.status().as_u16() == SUCCESS_CODE {
             let text = resp.bytes().await?;
@@ -74,9 +222,11 @@ impl HttpClient {
 
     pub async fn execute<E: DeserializeOwned + Into<Error>, const 
SUCCESS_CODE: u16>(
         &self,
-        request: Request,
+        mut request: Request,
     ) -> Result<()> {
-        let resp = self.0.execute(request).await?;
+        self.authenticate(&mut request).await?;
+
+        let resp = self.client.execute(request).await?;
 
         if resp.status().as_u16() == SUCCESS_CODE {
             Ok(())
@@ -99,10 +249,12 @@ impl HttpClient {
     /// More generic logic handling for special cases like head.
     pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
         &self,
-        request: Request,
+        mut request: Request,
         handler: impl FnOnce(&Response) -> Option<R>,
     ) -> Result<R> {
-        let resp = self.0.execute(request).await?;
+        self.authenticate(&mut request).await?;
+
+        let resp = self.client.execute(request).await?;
 
         if let Some(ret) = handler(&resp) {
             Ok(ret)
diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs
index 6b0a784..f94ee87 100644
--- a/crates/catalog/rest/src/lib.rs
+++ b/crates/catalog/rest/src/lib.rs
@@ -21,4 +21,6 @@
 
 mod catalog;
 mod client;
+mod types;
+
 pub use catalog::*;
diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs
new file mode 100644
index 0000000..c8d704b
--- /dev/null
+++ b/crates/catalog/rest/src/types.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use serde_derive::{Deserialize, Serialize};
+
+use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
+use iceberg::{
+    Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement, 
TableUpdate,
+};
+
+pub(super) const OK: u16 = 200u16;
+pub(super) const NO_CONTENT: u16 = 204u16;
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub(super) struct CatalogConfig {
+    pub(super) overrides: HashMap<String, String>,
+    pub(super) defaults: HashMap<String, String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ErrorResponse {
+    error: ErrorModel,
+}
+
+impl From<ErrorResponse> for Error {
+    fn from(resp: ErrorResponse) -> Error {
+        resp.error.into()
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ErrorModel {
+    pub(super) message: String,
+    pub(super) r#type: String,
+    pub(super) code: u16,
+    pub(super) stack: Option<Vec<String>>,
+}
+
+impl From<ErrorModel> for Error {
+    fn from(value: ErrorModel) -> Self {
+        let mut error = Error::new(ErrorKind::DataInvalid, value.message)
+            .with_context("type", value.r#type)
+            .with_context("code", format!("{}", value.code));
+
+        if let Some(stack) = value.stack {
+            error = error.with_context("stack", stack.join("\n"));
+        }
+
+        error
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct OAuthError {
+    pub(super) error: String,
+    pub(super) error_description: Option<String>,
+    pub(super) error_uri: Option<String>,
+}
+
+impl From<OAuthError> for Error {
+    fn from(value: OAuthError) -> Self {
+        let mut error = Error::new(
+            ErrorKind::DataInvalid,
+            format!("OAuthError: {}", value.error),
+        );
+
+        if let Some(desc) = value.error_description {
+            error = error.with_context("description", desc);
+        }
+
+        if let Some(uri) = value.error_uri {
+            error = error.with_context("uri", uri);
+        }
+
+        error
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct TokenResponse {
+    pub(super) access_token: String,
+    pub(super) token_type: String,
+    pub(super) expires_in: Option<u64>,
+    pub(super) issued_token_type: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct NamespaceSerde {
+    pub(super) namespace: Vec<String>,
+    pub(super) properties: Option<HashMap<String, String>>,
+}
+
+impl TryFrom<NamespaceSerde> for Namespace {
+    type Error = Error;
+    fn try_from(value: NamespaceSerde) -> std::result::Result<Self, 
Self::Error> {
+        Ok(Namespace::with_properties(
+            NamespaceIdent::from_vec(value.namespace)?,
+            value.properties.unwrap_or_default(),
+        ))
+    }
+}
+
+impl From<&Namespace> for NamespaceSerde {
+    fn from(value: &Namespace) -> Self {
+        Self {
+            namespace: value.name().as_ref().clone(),
+            properties: Some(value.properties().clone()),
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ListNamespaceResponse {
+    pub(super) namespaces: Vec<Vec<String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct UpdateNamespacePropsRequest {
+    removals: Option<Vec<String>>,
+    updates: Option<HashMap<String, String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct UpdateNamespacePropsResponse {
+    updated: Vec<String>,
+    removed: Vec<String>,
+    missing: Option<Vec<String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ListTableResponse {
+    pub(super) identifiers: Vec<TableIdent>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct RenameTableRequest {
+    pub(super) source: TableIdent,
+    pub(super) destination: TableIdent,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct LoadTableResponse {
+    pub(super) metadata_location: Option<String>,
+    pub(super) metadata: TableMetadata,
+    pub(super) config: Option<HashMap<String, String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct CreateTableRequest {
+    pub(super) name: String,
+    pub(super) location: Option<String>,
+    pub(super) schema: Schema,
+    pub(super) partition_spec: Option<UnboundPartitionSpec>,
+    pub(super) write_order: Option<SortOrder>,
+    pub(super) stage_create: Option<bool>,
+    pub(super) properties: Option<HashMap<String, String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct CommitTableRequest {
+    pub(super) identifier: TableIdent,
+    pub(super) requirements: Vec<TableRequirement>,
+    pub(super) updates: Vec<TableUpdate>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct CommitTableResponse {
+    pub(super) metadata_location: String,
+    pub(super) metadata: TableMetadata,
+}
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs 
b/crates/catalog/rest/tests/rest_catalog_test.rs
index 205428d..3c22241 100644
--- a/crates/catalog/rest/tests/rest_catalog_test.rs
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -59,7 +59,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
     let config = RestCatalogConfig::builder()
         .uri(format!("http://{}:{}";, rest_catalog_ip, REST_CATALOG_PORT))
         .build();
-    let rest_catalog = RestCatalog::new(config).await.unwrap();
+    let rest_catalog = RestCatalog::new(config);
 
     TestFixture {
         _docker_compose: docker_compose,
diff --git a/crates/examples/src/rest_catalog_namespace.rs 
b/crates/examples/src/rest_catalog_namespace.rs
index 0a3b00b..3716899 100644
--- a/crates/examples/src/rest_catalog_namespace.rs
+++ b/crates/examples/src/rest_catalog_namespace.rs
@@ -27,7 +27,7 @@ async fn main() {
         .uri("http://localhost:8080".to_string())
         .build();
 
-    let catalog = RestCatalog::new(config).await.unwrap();
+    let catalog = RestCatalog::new(config);
     // ANCHOR_END: create_catalog
 
     // ANCHOR: list_all_namespace
diff --git a/crates/examples/src/rest_catalog_table.rs 
b/crates/examples/src/rest_catalog_table.rs
index 9fb3dd7..f25ce45 100644
--- a/crates/examples/src/rest_catalog_table.rs
+++ b/crates/examples/src/rest_catalog_table.rs
@@ -27,7 +27,7 @@ async fn main() {
         .uri("http://localhost:8080".to_string())
         .build();
 
-    let catalog = RestCatalog::new(config).await.unwrap();
+    let catalog = RestCatalog::new(config);
 
     // ANCHOR: create_table
     let table_id = TableIdent::from_strs(["default", "t1"]).unwrap();

Reply via email to