This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 875349c1 refactor: REST `Catalog` implementation (#965)
875349c1 is described below

commit 875349c17dbf6c8d80f87436755089db7e4eaae6
Author: Connor Tsui <[email protected]>
AuthorDate: Mon Mar 10 01:10:48 2025 -0400

    refactor: REST `Catalog` implementation (#965)
    
    Followup of #962
    
    #962 Introduced a bug where it is not some of the methods allow for both
    `StatusCode::OK` and `StatusCode::NO_CONTENT` as success cases, when in
    reality it should be one or the other (this was me, sorry about that).
    
    This PR attempts to unify the 3 different types of response helpers that
    essentially all do the exact same thing slightly differently. The main
    addition here is a function `query_catalog`:
    
    ```rust
        // Queries the Iceberg REST catalog with the given `Request` and a 
provided handler.
        pub async fn query_catalog<R, H, Fut>(&self, mut request: Request, 
handler: H) -> Result<R>
        where
            R: DeserializeOwned,
            H: FnOnce(Response) -> Fut,
            Fut: Future<Output = Result<R>>,
        {
            self.authenticate(&mut request).await?;
    
            let response = self.client.execute(request).await?;
    
            handler(response).await
        }
    ```
    
    By allowing each `Catalog` method to specify how they want to handle the
    responses, it gets much finer control on the success/error cases as well
    as the error messages. Previously, there were 3 functions that all did
    similar things:
    
    ```rust
        pub async fn query<R: DeserializeOwned, E: DeserializeOwned + 
Into<Error>>(
            &self,
            mut request: Request,
        ) -> Result<R> {
    
        pub async fn execute<E: DeserializeOwned + Into<Error>>(
            &self,
            mut request: Request,
        ) -> Result<()> {
    
        pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
            &self,
            mut request: Request,
            handler: impl FnOnce(&Response) -> Option<R>,
        ) -> Result<R> {
    ```
    
    I'm also somewhat using this as a chance to refactor some other parts of
    this crate, mainly documentation and examples.
    
    @Xuanwo It would be great if I could get feedback on some of these
    proposed changes before I keep going!
---
 crates/catalog/rest/src/catalog.rs             | 542 ++++++++++++++-----------
 crates/catalog/rest/src/client.rs              | 130 ++----
 crates/catalog/rest/tests/rest_catalog_test.rs |   5 +-
 crates/examples/src/rest_catalog_namespace.rs  |  52 ++-
 crates/examples/src/rest_catalog_table.rs      |  64 +--
 5 files changed, 404 insertions(+), 389 deletions(-)

diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index 7d7f10d2..8df5a4bd 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! This module contains rest catalog implementation.
+//! This module contains the iceberg REST catalog implementation.
 
 use std::collections::HashMap;
 use std::str::FromStr;
@@ -35,9 +35,11 @@ use reqwest::{Method, StatusCode, Url};
 use tokio::sync::OnceCell;
 use typed_builder::TypedBuilder;
 
-use crate::client::HttpClient;
+use crate::client::{
+    deserialize_catalog_response, deserialize_unexpected_catalog_error, 
HttpClient,
+};
 use crate::types::{
-    CatalogConfig, CommitTableRequest, CommitTableResponse, 
CreateTableRequest, ErrorResponse,
+    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
     ListNamespaceResponse, ListTableResponse, LoadTableResponse, 
NamespaceSerde,
     RenameTableRequest,
 };
@@ -50,6 +52,7 @@ const PATH_V1: &str = "v1";
 #[derive(Clone, Debug, TypedBuilder)]
 pub struct RestCatalogConfig {
     uri: String,
+
     #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
     warehouse: Option<String>,
 
@@ -105,13 +108,13 @@ impl RestCatalogConfig {
 
     /// Get the token from the config.
     ///
-    /// Client will use `token` to send requests if exists.
+    /// The client can use this token to send requests.
     pub(crate) fn token(&self) -> Option<String> {
         self.props.get("token").cloned()
     }
 
-    /// Get the credentials from the config. Client will use `credential`
-    /// to fetch a new token if exists.
+    /// Get the credentials from the config. The client can use these 
credentials to fetch a new
+    /// token.
     ///
     /// ## Output
     ///
@@ -129,14 +132,12 @@ impl RestCatalogConfig {
         }
     }
 
-    /// Get the extra headers from config.
-    ///
-    /// We will include:
+    /// Get the extra headers from config, which includes:
     ///
     /// - `content-type`
     /// - `x-client-version`
-    /// - `user-agnet`
-    /// - all headers specified by `header.xxx` in props.
+    /// - `user-agent`
+    /// - All headers specified by `header.xxx` in props.
     pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
         let mut headers = HeaderMap::from_iter([
             (
@@ -156,9 +157,7 @@ impl RestCatalogConfig {
         for (key, value) in self
             .props
             .iter()
-            .filter(|(k, _)| k.starts_with("header."))
-            // The unwrap here is same since we are filtering the keys
-            .map(|(k, v)| (k.strip_prefix("header.").unwrap(), v))
+            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
         {
             headers.insert(
                 HeaderName::from_str(key).map_err(|e| {
@@ -181,7 +180,7 @@ impl RestCatalogConfig {
         Ok(headers)
     }
 
-    /// Get the optional oauth headers from the config.
+    /// Get the optional OAuth headers from the config.
     pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
         let mut params = HashMap::new();
 
@@ -197,10 +196,11 @@ impl RestCatalogConfig {
                 params.insert(param_name.to_string(), value.to_string());
             }
         }
+
         params
     }
 
-    /// Merge the config with the given config fetched from rest server.
+    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched 
from the REST server).
     pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> 
Self {
         if let Some(uri) = config.overrides.remove("uri") {
             self.uri = uri;
@@ -218,15 +218,12 @@ impl RestCatalogConfig {
 #[derive(Debug)]
 struct RestContext {
     client: HttpClient,
-
     /// Runtime config is fetched from rest server and stored here.
     ///
     /// It's could be different from the user config.
     config: RestCatalogConfig,
 }
 
-impl RestContext {}
-
 /// Rest catalog implementation.
 #[derive(Debug)]
 pub struct RestCatalog {
@@ -238,7 +235,7 @@ pub struct RestCatalog {
 }
 
 impl RestCatalog {
-    /// Creates a rest catalog from config.
+    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
     pub fn new(config: RestCatalogConfig) -> Self {
         Self {
             user_config: config,
@@ -246,7 +243,7 @@ impl RestCatalog {
         }
     }
 
-    /// Get the context from the catalog.
+    /// Gets the [`RestContext`] from the catalog.
     async fn context(&self) -> Result<&RestContext> {
         self.ctx
             .get_or_try_init(|| async {
@@ -260,24 +257,27 @@ impl RestCatalog {
             .await
     }
 
-    /// Load the runtime config from the server by user_config.
+    /// Load the runtime config from the server by `user_config`.
     ///
-    /// It's required for a rest catalog to update it's config after creation.
+    /// It's required for a REST catalog to update its config after creation.
     async fn load_config(
         client: &HttpClient,
         user_config: &RestCatalogConfig,
     ) -> Result<CatalogConfig> {
-        let mut request = client.request(Method::GET, 
user_config.config_endpoint());
+        let mut request_builder = client.request(Method::GET, 
user_config.config_endpoint());
 
         if let Some(warehouse_location) = &user_config.warehouse {
-            request = request.query(&[("warehouse", warehouse_location)]);
+            request_builder = request_builder.query(&[("warehouse", 
warehouse_location)]);
         }
 
-        let config = client
-            .query::<CatalogConfig, ErrorResponse>(request.build()?)
-            .await?;
+        let request = request_builder.build()?;
+
+        let http_response = client.query_catalog(request).await?;
 
-        Ok(config)
+        match http_response.status() {
+            StatusCode::OK => 
deserialize_catalog_response(http_response).await,
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
     async fn load_file_io(
@@ -312,90 +312,118 @@ impl RestCatalog {
     }
 }
 
+/// All requests and expected responses are derived from the REST catalog API 
spec:
+/// 
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
 #[async_trait]
 impl Catalog for RestCatalog {
-    /// List namespaces from table.
     async fn list_namespaces(
         &self,
         parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
-        let mut request = self.context().await?.client.request(
-            Method::GET,
-            self.context().await?.config.namespaces_endpoint(),
-        );
-        if let Some(ns) = parent {
-            request = request.query(&[("parent", ns.to_url_string())]);
-        }
+        let context = self.context().await?;
 
-        let resp = self
-            .context()
-            .await?
+        let mut request_builder = context
             .client
-            .query::<ListNamespaceResponse, ErrorResponse>(request.build()?)
-            .await?;
-
-        resp.namespaces
-            .into_iter()
-            .map(NamespaceIdent::from_vec)
-            .collect::<Result<Vec<NamespaceIdent>>>()
+            .request(Method::GET, context.config.namespaces_endpoint());
+        // Filter on `parent={namespace}` if a parent namespace exists.
+        if let Some(namespace) = parent {
+            request_builder = request_builder.query(&[("parent", 
namespace.to_url_string())]);
+        }
+        let request = request_builder.build()?;
+
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::OK => {
+                let response =
+                    
deserialize_catalog_response::<ListNamespaceResponse>(http_response).await?;
+                response
+                    .namespaces
+                    .into_iter()
+                    .map(NamespaceIdent::from_vec)
+                    .collect::<Result<Vec<NamespaceIdent>>>()
+            }
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "The parent parameter of the namespace provided does not 
exist",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
-    /// Create a new namespace inside the catalog.
     async fn create_namespace(
         &self,
         namespace: &NamespaceIdent,
         properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::POST,
-                self.context().await?.config.namespaces_endpoint(),
-            )
+            .request(Method::POST, context.config.namespaces_endpoint())
             .json(&NamespaceSerde {
                 namespace: namespace.as_ref().clone(),
                 properties: Some(properties),
             })
             .build()?;
 
-        let resp = self
-            .context()
-            .await?
-            .client
-            .query::<NamespaceSerde, ErrorResponse>(request)
-            .await?;
+        let http_response = context.client.query_catalog(request).await?;
 
-        Namespace::try_from(resp)
+        match http_response.status() {
+            StatusCode::OK => {
+                let response =
+                    
deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
+                Namespace::try_from(response)
+            }
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to create a namespace that already exists",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
-    /// Get a namespace information from the catalog.
     async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::GET,
-                self.context().await?.config.namespace_endpoint(namespace),
-            )
+            .request(Method::GET, context.config.namespace_endpoint(namespace))
             .build()?;
 
-        let resp = self
-            .context()
-            .await?
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::OK => {
+                let response =
+                    
deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
+                Namespace::try_from(response)
+            }
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to get a namespace that does not exist",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
+    }
+
+    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .query::<NamespaceSerde, ErrorResponse>(request)
-            .await?;
-        Namespace::try_from(resp)
+            .request(Method::HEAD, context.config.namespace_endpoint(ns))
+            .build()?;
+
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
+            StatusCode::NOT_FOUND => Ok(false),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
-    /// Update a namespace inside the catalog.
-    ///
-    /// # Behavior
-    ///
-    /// The properties must be the full set of namespace.
     async fn update_namespace(
         &self,
         _namespace: &NamespaceIdent,
@@ -407,67 +435,48 @@ impl Catalog for RestCatalog {
         ))
     }
 
-    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
-        let request = self
-            .context()
-            .await?
-            .client
-            .request(
-                Method::HEAD,
-                self.context().await?.config.namespace_endpoint(ns),
-            )
-            .build()?;
-
-        self.context()
-            .await?
-            .client
-            .do_execute::<bool, ErrorResponse>(request, |resp| match 
resp.status() {
-                StatusCode::OK | StatusCode::NO_CONTENT => Some(true),
-                StatusCode::NOT_FOUND => Some(false),
-                _ => None,
-            })
-            .await
-    }
-
-    /// Drop a namespace from the catalog.
     async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::DELETE,
-                self.context().await?.config.namespace_endpoint(namespace),
-            )
+            .request(Method::DELETE, 
context.config.namespace_endpoint(namespace))
             .build()?;
 
-        self.context()
-            .await?
-            .client
-            .execute::<ErrorResponse>(request)
-            .await
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to drop a namespace that does not exist",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
-    /// List tables from namespace.
     async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::GET,
-                self.context().await?.config.tables_endpoint(namespace),
-            )
+            .request(Method::GET, context.config.tables_endpoint(namespace))
             .build()?;
 
-        let resp = self
-            .context()
-            .await?
-            .client
-            .query::<ListTableResponse, ErrorResponse>(request)
-            .await?;
+        let http_response = context.client.query_catalog(request).await?;
 
-        Ok(resp.identifiers)
+        match http_response.status() {
+            StatusCode::OK => Ok(
+                
deserialize_catalog_response::<ListTableResponse>(http_response)
+                    .await?
+                    .identifiers,
+            ),
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to list tables of a namespace that does not exist",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
     /// Create a new table inside the namespace.
@@ -481,23 +490,19 @@ impl Catalog for RestCatalog {
         namespace: &NamespaceIdent,
         creation: TableCreation,
     ) -> Result<Table> {
+        let context = self.context().await?;
+
         let table_ident = TableIdent::new(namespace.clone(), 
creation.name.clone());
 
-        let request = self
-            .context()
-            .await?
+        let request = context
             .client
-            .request(
-                Method::POST,
-                self.context().await?.config.tables_endpoint(namespace),
-            )
+            .request(Method::POST, context.config.tables_endpoint(namespace))
             .json(&CreateTableRequest {
                 name: creation.name,
                 location: creation.location,
                 schema: creation.schema,
                 partition_spec: creation.partition_spec,
                 write_order: creation.sort_order,
-                // We don't support stage create yet.
                 stage_create: Some(false),
                 properties: if creation.properties.is_empty() {
                     None
@@ -507,14 +512,33 @@ impl Catalog for RestCatalog {
             })
             .build()?;
 
-        let resp = self
-            .context()
-            .await?
-            .client
-            .query::<LoadTableResponse, ErrorResponse>(request)
-            .await?;
+        let http_response = context.client.query_catalog(request).await?;
 
-        let config = resp
+        let response = match http_response.status() {
+            StatusCode::OK => {
+                
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
+            }
+            StatusCode::NOT_FOUND => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Tried to create a table under a namespace that does not 
exist",
+                ))
+            }
+            StatusCode::CONFLICT => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "The table already exists",
+                ))
+            }
+            _ => return 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        };
+
+        let metadata_location = 
response.metadata_location.as_ref().ok_or(Error::new(
+            ErrorKind::DataInvalid,
+            "Metadata location missing in `create_table` response!",
+        ))?;
+
+        let config = response
             .config
             .unwrap_or_default()
             .into_iter()
@@ -522,47 +546,51 @@ impl Catalog for RestCatalog {
             .collect();
 
         let file_io = self
-            .load_file_io(resp.metadata_location.as_deref(), Some(config))
+            .load_file_io(Some(metadata_location), Some(config))
             .await?;
 
-        Table::builder()
-            .identifier(table_ident)
+        let table_builder = Table::builder()
+            .identifier(table_ident.clone())
             .file_io(file_io)
-            .metadata(resp.metadata)
-            .metadata_location(resp.metadata_location.ok_or_else(|| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    "Metadata location missing in create table response!",
-                )
-            })?)
-            .build()
+            .metadata(response.metadata);
+
+        if let Some(metadata_location) = response.metadata_location {
+            table_builder.metadata_location(metadata_location).build()
+        } else {
+            table_builder.build()
+        }
     }
 
     /// Load table from the catalog.
     ///
-    /// If there are any config properties that are present in
-    /// both the response from the REST server and the config provided
-    /// when creating this `RestCatalog` instance then the value
+    /// If there are any config properties that are present in both the 
response from the REST
+    /// server and the config provided when creating this `RestCatalog` 
instance, then the value
     /// provided locally to the `RestCatalog` will take precedence.
-    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
-        let request = self
-            .context()
-            .await?
+
+    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::GET,
-                self.context().await?.config.table_endpoint(table),
-            )
+            .request(Method::GET, context.config.table_endpoint(table_ident))
             .build()?;
 
-        let resp = self
-            .context()
-            .await?
-            .client
-            .query::<LoadTableResponse, ErrorResponse>(request)
-            .await?;
+        let http_response = context.client.query_catalog(request).await?;
+
+        let response = match http_response.status() {
+            StatusCode::OK | StatusCode::NOT_MODIFIED => {
+                
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
+            }
+            StatusCode::NOT_FOUND => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Tried to load a table that does not exist",
+                ))
+            }
+            _ => return 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        };
 
-        let config = resp
+        let config = response
             .config
             .unwrap_or_default()
             .into_iter()
@@ -570,15 +598,15 @@ impl Catalog for RestCatalog {
             .collect();
 
         let file_io = self
-            .load_file_io(resp.metadata_location.as_deref(), Some(config))
+            .load_file_io(response.metadata_location.as_deref(), Some(config))
             .await?;
 
         let table_builder = Table::builder()
-            .identifier(table.clone())
+            .identifier(table_ident.clone())
             .file_io(file_io)
-            .metadata(resp.metadata);
+            .metadata(response.metadata);
 
-        if let Some(metadata_location) = resp.metadata_location {
+        if let Some(metadata_location) = response.metadata_location {
             table_builder.metadata_location(metadata_location).build()
         } else {
             table_builder.build()
@@ -587,81 +615,80 @@ impl Catalog for RestCatalog {
 
     /// Drop a table from the catalog.
     async fn drop_table(&self, table: &TableIdent) -> Result<()> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::DELETE,
-                self.context().await?.config.table_endpoint(table),
-            )
+            .request(Method::DELETE, context.config.table_endpoint(table))
             .build()?;
 
-        self.context()
-            .await?
-            .client
-            .execute::<ErrorResponse>(request)
-            .await
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to drop a table that does not exist",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
     /// Check if a table exists in the catalog.
     async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::HEAD,
-                self.context().await?.config.table_endpoint(table),
-            )
+            .request(Method::HEAD, context.config.table_endpoint(table))
             .build()?;
 
-        self.context()
-            .await?
-            .client
-            .do_execute::<bool, ErrorResponse>(request, |resp| match 
resp.status() {
-                StatusCode::OK | StatusCode::NO_CONTENT => Some(true),
-                StatusCode::NOT_FOUND => Some(false),
-                _ => None,
-            })
-            .await
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
+            StatusCode::NOT_FOUND => Ok(false),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
     /// Rename a table in the catalog.
     async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> 
Result<()> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
-            .request(
-                Method::POST,
-                self.context().await?.config.rename_table_endpoint(),
-            )
+            .request(Method::POST, context.config.rename_table_endpoint())
             .json(&RenameTableRequest {
                 source: src.clone(),
                 destination: dest.clone(),
             })
             .build()?;
 
-        self.context()
-            .await?
-            .client
-            .execute::<ErrorResponse>(request)
-            .await
+        let http_response = context.client.query_catalog(request).await?;
+
+        match http_response.status() {
+            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+            StatusCode::NOT_FOUND => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to rename a table that does not exist (is the namespace 
correct?)",
+            )),
+            StatusCode::CONFLICT => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Tried to rename a table to a name that already exists",
+            )),
+            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        }
     }
 
-    /// Update table.
     async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
-        let request = self
-            .context()
-            .await?
+        let context = self.context().await?;
+
+        let request = context
             .client
             .request(
                 Method::POST,
-                self.context()
-                    .await?
-                    .config
-                    .table_endpoint(commit.identifier()),
+                context.config.table_endpoint(commit.identifier()),
             )
             .json(&CommitTableRequest {
                 identifier: commit.identifier().clone(),
@@ -670,21 +697,54 @@ impl Catalog for RestCatalog {
             })
             .build()?;
 
-        let resp = self
-            .context()
-            .await?
-            .client
-            .query::<CommitTableResponse, ErrorResponse>(request)
-            .await?;
+        let http_response = context.client.query_catalog(request).await?;
+
+        let response: CommitTableResponse = match http_response.status() {
+            StatusCode::OK => {
+                deserialize_catalog_response(http_response).await?
+            }
+            StatusCode::NOT_FOUND => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Tried to update a table that does not exist",
+                ))
+            }
+            StatusCode::CONFLICT => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "CommitFailedException, one or more requirements failed. 
The client may retry.",
+                ))
+            }
+            StatusCode::INTERNAL_SERVER_ERROR => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "An unknown server-side problem occurred; the commit state 
is unknown.",
+                ))
+            }
+            StatusCode::BAD_GATEWAY => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "A gateway or proxy received an invalid response from the 
upstream server; the commit state is unknown.",
+                ))
+            }
+            StatusCode::GATEWAY_TIMEOUT => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "A server-side gateway timeout occurred; the commit state 
is unknown.",
+                ))
+            }
+            _ => return 
Err(deserialize_unexpected_catalog_error(http_response).await),
+        };
 
         let file_io = self
-            .load_file_io(Some(&resp.metadata_location), None)
+            .load_file_io(Some(&response.metadata_location), None)
             .await?;
+
         Table::builder()
             .identifier(commit.identifier().clone())
             .file_io(file_io)
-            .metadata(resp.metadata)
-            .metadata_location(resp.metadata_location)
+            .metadata(response.metadata)
+            .metadata_location(response.metadata_location)
             .build()
     }
 }
@@ -1408,11 +1468,7 @@ mod tests {
             .await;
 
         assert!(table.is_err());
-        assert!(table
-            .err()
-            .unwrap()
-            .message()
-            .contains("Table does not exist"));
+        assert!(table.err().unwrap().message().contains("does not exist"));
 
         config_mock.assert_async().await;
         rename_table_mock.assert_async().await;
@@ -1609,7 +1665,7 @@ mod tests {
             .err()
             .unwrap()
             .message()
-            .contains("Table already exists"));
+            .contains("already exists"));
 
         config_mock.assert_async().await;
         create_table_mock.assert_async().await;
@@ -1789,7 +1845,7 @@ mod tests {
             .err()
             .unwrap()
             .message()
-            .contains("The given table does not exist"));
+            .contains("does not exist"));
 
         config_mock.assert_async().await;
         update_table_mock.assert_async().await;
diff --git a/crates/catalog/rest/src/client.rs 
b/crates/catalog/rest/src/client.rs
index 55ef47f0..dde75276 100644
--- a/crates/catalog/rest/src/client.rs
+++ b/crates/catalog/rest/src/client.rs
@@ -217,108 +217,42 @@ impl HttpClient {
         self.client.request(method, url)
     }
 
-    pub async fn query<R: DeserializeOwned, E: DeserializeOwned + Into<Error>>(
-        &self,
-        mut request: Request,
-    ) -> Result<R> {
+    // Queries the Iceberg REST catalog after authentication with the given 
`Request` and
+    // returns a `Response`.
+    pub async fn query_catalog(&self, mut request: Request) -> 
Result<Response> {
         self.authenticate(&mut request).await?;
-
-        let method = request.method().clone();
-        let url = request.url().clone();
-        let response = self.client.execute(request).await?;
-
-        if response.status() == StatusCode::OK {
-            let text = response
-                .bytes()
-                .await
-                .map_err(|err| err.with_url(url.clone()))?;
-            Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
-                Error::new(
-                    ErrorKind::Unexpected,
-                    "Failed to parse response from rest catalog server!",
-                )
-                .with_context("method", method.to_string())
-                .with_context("url", url.to_string())
-                .with_context("json", String::from_utf8_lossy(&text))
-                .with_source(e)
-            })?)
-        } else {
-            let code = response.status();
-            let text = response
-                .bytes()
-                .await
-                .map_err(|err| err.with_url(url.clone()))?;
-            let e = serde_json::from_slice::<E>(&text).map_err(|e| {
-                Error::new(ErrorKind::Unexpected, "Received unexpected 
response")
-                    .with_context("code", code.to_string())
-                    .with_context("method", method.to_string())
-                    .with_context("url", url.to_string())
-                    .with_context("json", String::from_utf8_lossy(&text))
-                    .with_source(e)
-            })?;
-            Err(e.into())
-        }
+        Ok(self.client.execute(request).await?)
     }
+}
 
-    pub async fn execute<E: DeserializeOwned + Into<Error>>(
-        &self,
-        mut request: Request,
-    ) -> Result<()> {
-        self.authenticate(&mut request).await?;
-
-        let method = request.method().clone();
-        let url = request.url().clone();
-        let response = self.client.execute(request).await?;
-
-        match response.status() {
-            StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
-            code => {
-                let text = response
-                    .bytes()
-                    .await
-                    .map_err(|err| err.with_url(url.clone()))?;
-                let e = serde_json::from_slice::<E>(&text).map_err(|e| {
-                    Error::new(ErrorKind::Unexpected, "Received unexpected 
response")
-                        .with_context("code", code.to_string())
-                        .with_context("method", method.to_string())
-                        .with_context("url", url.to_string())
-                        .with_context("json", String::from_utf8_lossy(&text))
-                        .with_source(e)
-                })?;
-                Err(e.into())
-            }
-        }
-    }
+/// Deserializes a catalog response into the given [`DeserializedOwned`] type.
+///
+/// Returns an error if unable to parse the response bytes.
+pub(crate) async fn deserialize_catalog_response<R: DeserializeOwned>(
+    response: Response,
+) -> Result<R> {
+    let bytes = response.bytes().await?;
 
-    /// More generic logic handling for special cases like head.
-    pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
-        &self,
-        mut request: Request,
-        handler: impl FnOnce(&Response) -> Option<R>,
-    ) -> Result<R> {
-        self.authenticate(&mut request).await?;
+    serde_json::from_slice::<R>(&bytes).map_err(|e| {
+        Error::new(
+            ErrorKind::Unexpected,
+            "Failed to parse response from rest catalog server",
+        )
+        .with_context("json", String::from_utf8_lossy(&bytes))
+        .with_source(e)
+    })
+}
 
-        let method = request.method().clone();
-        let url = request.url().clone();
-        let response = self.client.execute(request).await?;
+/// Deserializes a unexpected catalog response into an error.
+///
+/// TODO: Eventually, this function should return an error response that is 
custom to the error
+/// codes that all endpoints share (400, 404, etc.).
+pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) 
-> Error {
+    let bytes = match response.bytes().await {
+        Ok(bytes) => bytes,
+        Err(err) => return err.into(),
+    };
 
-        if let Some(ret) = handler(&response) {
-            Ok(ret)
-        } else {
-            let code = response.status();
-            let text = response
-                .bytes()
-                .await
-                .map_err(|err| err.with_url(url.clone()))?;
-            let e = serde_json::from_slice::<E>(&text).map_err(|e| {
-                Error::new(ErrorKind::Unexpected, "Received unexpected 
response")
-                    .with_context("code", code.to_string())
-                    .with_context("method", method.to_string())
-                    .with_context("url", url.to_string())
-                    .with_context("json", String::from_utf8_lossy(&text))
-                    .with_source(e)
-            })?;
-            Err(e.into())
-        }
-    }
+    Error::new(ErrorKind::Unexpected, "Received unexpected response")
+        .with_context("json", String::from_utf8_lossy(&bytes))
 }
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs 
b/crates/catalog/rest/tests/rest_catalog_test.rs
index f70fc2f3..71ec33b5 100644
--- a/crates/catalog/rest/tests/rest_catalog_test.rs
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -81,10 +81,7 @@ async fn test_get_non_exist_namespace() {
         .await;
 
     assert!(result.is_err());
-    assert!(result
-        .unwrap_err()
-        .to_string()
-        .contains("Namespace does not exist"));
+    assert!(result.unwrap_err().to_string().contains("does not exist"));
 }
 
 #[tokio::test]
diff --git a/crates/examples/src/rest_catalog_namespace.rs 
b/crates/examples/src/rest_catalog_namespace.rs
index 99aaf2e6..8b6e12a5 100644
--- a/crates/examples/src/rest_catalog_namespace.rs
+++ b/crates/examples/src/rest_catalog_namespace.rs
@@ -20,40 +20,50 @@ use std::collections::HashMap;
 use iceberg::{Catalog, NamespaceIdent};
 use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
 
-/// It a simple example that demonstrates how to create a namespace in a REST 
catalog.
-/// It requires a running instance of the iceberg-rest catalog for the port 
8181.
-/// You can find how to run the iceberg-rest catalog in the official 
documentation.
+static REST_URI: &str = "http://localhost:8181";;
+
+/// This is a simple example that demonstrates how to use [`RestCatalog`] to 
create namespaces.
+///
+/// The demo creates a namespace and prints it out.
 ///
-/// [Quickstart](https://iceberg.apache.org/spark-quickstart/)
+/// A running instance of the iceberg-rest catalog on port 8181 is required. 
You can find how to run
+/// the iceberg-rest catalog with `docker compose` in the official
+/// [quickstart documentation](https://iceberg.apache.org/spark-quickstart/).
 #[tokio::main]
 async fn main() {
-    // ANCHOR: create_catalog
-    // Create catalog
+    // Create the REST iceberg catalog.
     let config = RestCatalogConfig::builder()
-        .uri("http://localhost:8181".to_string())
+        .uri(REST_URI.to_string())
         .build();
-
     let catalog = RestCatalog::new(config);
-    // ANCHOR_END: create_catalog
 
-    // ANCHOR: list_all_namespace
-    // List all namespaces
-    let all_namespaces = catalog.list_namespaces(None).await.unwrap();
-    println!("Namespaces in current catalog: {:?}", all_namespaces);
-    // ANCHOR_END: list_all_namespace
+    // List all namespaces already in the catalog.
+    let existing_namespaces = catalog.list_namespaces(None).await.unwrap();
+    println!(
+        "Namespaces alreading in the existing catalog: {:?}",
+        existing_namespaces
+    );
 
-    // ANCHOR: create_namespace
-    let namespace_id =
+    // Create a new namespace identifier.
+    let namespace_ident =
         NamespaceIdent::from_vec(vec!["ns1".to_string(), 
"ns11".to_string()]).unwrap();
-    // Create namespace
-    let ns = catalog
+
+    // Drop the namespace if it already exists.
+    if catalog.namespace_exists(&namespace_ident).await.unwrap() {
+        println!("Namespace already exists, dropping now.",);
+        catalog.drop_namespace(&namespace_ident).await.unwrap();
+    }
+
+    // Create the new namespace in the catalog.
+    let _created_namespace = catalog
         .create_namespace(
-            &namespace_id,
+            &namespace_ident,
             HashMap::from([("key1".to_string(), "value1".to_string())]),
         )
         .await
         .unwrap();
+    println!("Namespace {:?} created!", namespace_ident);
 
-    println!("Namespace created: {:?}", ns);
-    // ANCHOR_END: create_namespace
+    let loaded_namespace = 
catalog.get_namespace(&namespace_ident).await.unwrap();
+    println!("Namespace loaded!\n\nNamespace: {:#?}", loaded_namespace,);
 }
diff --git a/crates/examples/src/rest_catalog_table.rs 
b/crates/examples/src/rest_catalog_table.rs
index 17455915..6b8a75c4 100644
--- a/crates/examples/src/rest_catalog_table.rs
+++ b/crates/examples/src/rest_catalog_table.rs
@@ -18,26 +18,42 @@
 use std::collections::HashMap;
 
 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
-use iceberg::{Catalog, TableCreation, TableIdent};
+use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
 use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
 
-/// This is a simple example that demonstrates how to create a table in a REST 
catalog and get it back.
-/// It requires a running instance of the iceberg-rest catalog for the port 
8080.
-/// You can find how to run the iceberg-rest catalog in the official 
documentation.
+static REST_URI: &str = "http://localhost:8181";;
+static NAMESPACE: &str = "default";
+static TABLE_NAME: &str = "t1";
+
+/// This is a simple example that demonstrates how to use [`RestCatalog`] to 
create tables.
+///
+/// The demo creates a table creates a table and then later retrieves the same 
table.
 ///
-/// [Quickstart](https://iceberg.apache.org/spark-quickstart/)
+/// A running instance of the iceberg-rest catalog on port 8181 is required. 
You can find how to run
+/// the iceberg-rest catalog with `docker compose` in the official
+/// [quickstart documentation](https://iceberg.apache.org/spark-quickstart/).
 #[tokio::main]
 async fn main() {
-    // Create catalog
+    // Create the REST iceberg catalog.
     let config = RestCatalogConfig::builder()
-        .uri("http://localhost:8181".to_string())
+        .uri(REST_URI.to_string())
         .build();
-
     let catalog = RestCatalog::new(config);
 
-    // ANCHOR: create_table
-    let table_id = TableIdent::from_strs(["default", "t1"]).unwrap();
+    // Create the table identifier.
+    let namespace_ident = 
NamespaceIdent::from_vec(vec![NAMESPACE.to_string()]).unwrap();
+    let table_ident = TableIdent::new(namespace_ident.clone(), 
TABLE_NAME.to_string());
+
+    // You can also use the `from_strs` method on `TableIdent` to create the 
table identifier.
+    // let table_ident = TableIdent::from_strs([NAMESPACE, 
TABLE_NAME]).unwrap();
 
+    // Drop the table if it already exists.
+    if catalog.table_exists(&table_ident).await.unwrap() {
+        println!("Table {TABLE_NAME} already exists, dropping now.");
+        catalog.drop_table(&table_ident).await.unwrap();
+    }
+
+    // Build the table schema.
     let table_schema = Schema::builder()
         .with_fields(vec![
             NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
@@ -49,26 +65,28 @@ async fn main() {
         .build()
         .unwrap();
 
-    // Create table
+    // Build the table creation parameters.
     let table_creation = TableCreation::builder()
-        .name(table_id.name.clone())
+        .name(table_ident.name.clone())
         .schema(table_schema.clone())
         .properties(HashMap::from([("owner".to_string(), 
"testx".to_string())]))
         .build();
 
-    let table = catalog
-        .create_table(&table_id.namespace, table_creation)
+    // Create the table.
+    let _created_table = catalog
+        .create_table(&table_ident.namespace, table_creation)
         .await
         .unwrap();
+    println!("Table {TABLE_NAME} created!");
 
-    println!("Table created: {:?}", table.metadata());
-    // ANCHOR_END: create_table
-
-    // ANCHOR: load_table
-    let table_created = catalog
-        .load_table(&TableIdent::from_strs(["default", "t1"]).unwrap())
+    // Ensure that the table is under the correct namespace.
+    assert!(catalog
+        .list_tables(&namespace_ident)
         .await
-        .unwrap();
-    println!("{:?}", table_created.metadata());
-    // ANCHOR_END: load_table
+        .unwrap()
+        .contains(&table_ident));
+
+    // Load the table back from the catalog. It should be identical to the 
created table.
+    let loaded_table = catalog.load_table(&table_ident).await.unwrap();
+    println!("Table {TABLE_NAME} loaded!\n\nTable: {:?}", loaded_table);
 }


Reply via email to