This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 875349c1 refactor: REST `Catalog` implementation (#965)
875349c1 is described below
commit 875349c17dbf6c8d80f87436755089db7e4eaae6
Author: Connor Tsui <[email protected]>
AuthorDate: Mon Mar 10 01:10:48 2025 -0400
refactor: REST `Catalog` implementation (#965)
Followup of #962
#962 Introduced a bug where it is not some of the methods allow for both
`StatusCode::OK` and `StatusCode::NO_CONTENT` as success cases, when in
reality it should be one or the other (this was me, sorry about that).
This PR attempts to unify the 3 different types of response helpers that
essentially all do the exact same thing slightly differently. The main
addition here is a function `query_catalog`:
```rust
// Queries the Iceberg REST catalog with the given `Request` and a
provided handler.
pub async fn query_catalog<R, H, Fut>(&self, mut request: Request,
handler: H) -> Result<R>
where
R: DeserializeOwned,
H: FnOnce(Response) -> Fut,
Fut: Future<Output = Result<R>>,
{
self.authenticate(&mut request).await?;
let response = self.client.execute(request).await?;
handler(response).await
}
```
By allowing each `Catalog` method to specify how they want to handle the
responses, it gets much finer control on the success/error cases as well
as the error messages. Previously, there were 3 functions that all did
similar things:
```rust
pub async fn query<R: DeserializeOwned, E: DeserializeOwned +
Into<Error>>(
&self,
mut request: Request,
) -> Result<R> {
pub async fn execute<E: DeserializeOwned + Into<Error>>(
&self,
mut request: Request,
) -> Result<()> {
pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
&self,
mut request: Request,
handler: impl FnOnce(&Response) -> Option<R>,
) -> Result<R> {
```
I'm also somewhat using this as a chance to refactor some other parts of
this crate, mainly documentation and examples.
@Xuanwo It would be great if I could get feedback on some of these
proposed changes before I keep going!
---
crates/catalog/rest/src/catalog.rs | 542 ++++++++++++++-----------
crates/catalog/rest/src/client.rs | 130 ++----
crates/catalog/rest/tests/rest_catalog_test.rs | 5 +-
crates/examples/src/rest_catalog_namespace.rs | 52 ++-
crates/examples/src/rest_catalog_table.rs | 64 +--
5 files changed, 404 insertions(+), 389 deletions(-)
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 7d7f10d2..8df5a4bd 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! This module contains rest catalog implementation.
+//! This module contains the iceberg REST catalog implementation.
use std::collections::HashMap;
use std::str::FromStr;
@@ -35,9 +35,11 @@ use reqwest::{Method, StatusCode, Url};
use tokio::sync::OnceCell;
use typed_builder::TypedBuilder;
-use crate::client::HttpClient;
+use crate::client::{
+ deserialize_catalog_response, deserialize_unexpected_catalog_error,
HttpClient,
+};
use crate::types::{
- CatalogConfig, CommitTableRequest, CommitTableResponse,
CreateTableRequest, ErrorResponse,
+ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
ListNamespaceResponse, ListTableResponse, LoadTableResponse,
NamespaceSerde,
RenameTableRequest,
};
@@ -50,6 +52,7 @@ const PATH_V1: &str = "v1";
#[derive(Clone, Debug, TypedBuilder)]
pub struct RestCatalogConfig {
uri: String,
+
#[builder(default, setter(strip_option(fallback = warehouse_opt)))]
warehouse: Option<String>,
@@ -105,13 +108,13 @@ impl RestCatalogConfig {
/// Get the token from the config.
///
- /// Client will use `token` to send requests if exists.
+ /// The client can use this token to send requests.
pub(crate) fn token(&self) -> Option<String> {
self.props.get("token").cloned()
}
- /// Get the credentials from the config. Client will use `credential`
- /// to fetch a new token if exists.
+ /// Get the credentials from the config. The client can use these
credentials to fetch a new
+ /// token.
///
/// ## Output
///
@@ -129,14 +132,12 @@ impl RestCatalogConfig {
}
}
- /// Get the extra headers from config.
- ///
- /// We will include:
+ /// Get the extra headers from config, which includes:
///
/// - `content-type`
/// - `x-client-version`
- /// - `user-agnet`
- /// - all headers specified by `header.xxx` in props.
+ /// - `user-agent`
+ /// - All headers specified by `header.xxx` in props.
pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
let mut headers = HeaderMap::from_iter([
(
@@ -156,9 +157,7 @@ impl RestCatalogConfig {
for (key, value) in self
.props
.iter()
- .filter(|(k, _)| k.starts_with("header."))
- // The unwrap here is same since we are filtering the keys
- .map(|(k, v)| (k.strip_prefix("header.").unwrap(), v))
+ .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
{
headers.insert(
HeaderName::from_str(key).map_err(|e| {
@@ -181,7 +180,7 @@ impl RestCatalogConfig {
Ok(headers)
}
- /// Get the optional oauth headers from the config.
+ /// Get the optional OAuth headers from the config.
pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
let mut params = HashMap::new();
@@ -197,10 +196,11 @@ impl RestCatalogConfig {
params.insert(param_name.to_string(), value.to_string());
}
}
+
params
}
- /// Merge the config with the given config fetched from rest server.
+ /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched
from the REST server).
pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) ->
Self {
if let Some(uri) = config.overrides.remove("uri") {
self.uri = uri;
@@ -218,15 +218,12 @@ impl RestCatalogConfig {
#[derive(Debug)]
struct RestContext {
client: HttpClient,
-
/// Runtime config is fetched from rest server and stored here.
///
/// It's could be different from the user config.
config: RestCatalogConfig,
}
-impl RestContext {}
-
/// Rest catalog implementation.
#[derive(Debug)]
pub struct RestCatalog {
@@ -238,7 +235,7 @@ pub struct RestCatalog {
}
impl RestCatalog {
- /// Creates a rest catalog from config.
+ /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
pub fn new(config: RestCatalogConfig) -> Self {
Self {
user_config: config,
@@ -246,7 +243,7 @@ impl RestCatalog {
}
}
- /// Get the context from the catalog.
+ /// Gets the [`RestContext`] from the catalog.
async fn context(&self) -> Result<&RestContext> {
self.ctx
.get_or_try_init(|| async {
@@ -260,24 +257,27 @@ impl RestCatalog {
.await
}
- /// Load the runtime config from the server by user_config.
+ /// Load the runtime config from the server by `user_config`.
///
- /// It's required for a rest catalog to update it's config after creation.
+ /// It's required for a REST catalog to update its config after creation.
async fn load_config(
client: &HttpClient,
user_config: &RestCatalogConfig,
) -> Result<CatalogConfig> {
- let mut request = client.request(Method::GET,
user_config.config_endpoint());
+ let mut request_builder = client.request(Method::GET,
user_config.config_endpoint());
if let Some(warehouse_location) = &user_config.warehouse {
- request = request.query(&[("warehouse", warehouse_location)]);
+ request_builder = request_builder.query(&[("warehouse",
warehouse_location)]);
}
- let config = client
- .query::<CatalogConfig, ErrorResponse>(request.build()?)
- .await?;
+ let request = request_builder.build()?;
+
+ let http_response = client.query_catalog(request).await?;
- Ok(config)
+ match http_response.status() {
+ StatusCode::OK =>
deserialize_catalog_response(http_response).await,
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
async fn load_file_io(
@@ -312,90 +312,118 @@ impl RestCatalog {
}
}
+/// All requests and expected responses are derived from the REST catalog API
spec:
+///
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
#[async_trait]
impl Catalog for RestCatalog {
- /// List namespaces from table.
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
- let mut request = self.context().await?.client.request(
- Method::GET,
- self.context().await?.config.namespaces_endpoint(),
- );
- if let Some(ns) = parent {
- request = request.query(&[("parent", ns.to_url_string())]);
- }
+ let context = self.context().await?;
- let resp = self
- .context()
- .await?
+ let mut request_builder = context
.client
- .query::<ListNamespaceResponse, ErrorResponse>(request.build()?)
- .await?;
-
- resp.namespaces
- .into_iter()
- .map(NamespaceIdent::from_vec)
- .collect::<Result<Vec<NamespaceIdent>>>()
+ .request(Method::GET, context.config.namespaces_endpoint());
+ // Filter on `parent={namespace}` if a parent namespace exists.
+ if let Some(namespace) = parent {
+ request_builder = request_builder.query(&[("parent",
namespace.to_url_string())]);
+ }
+ let request = request_builder.build()?;
+
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::OK => {
+ let response =
+
deserialize_catalog_response::<ListNamespaceResponse>(http_response).await?;
+ response
+ .namespaces
+ .into_iter()
+ .map(NamespaceIdent::from_vec)
+ .collect::<Result<Vec<NamespaceIdent>>>()
+ }
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "The parent parameter of the namespace provided does not
exist",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
- /// Create a new namespace inside the catalog.
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::POST,
- self.context().await?.config.namespaces_endpoint(),
- )
+ .request(Method::POST, context.config.namespaces_endpoint())
.json(&NamespaceSerde {
namespace: namespace.as_ref().clone(),
properties: Some(properties),
})
.build()?;
- let resp = self
- .context()
- .await?
- .client
- .query::<NamespaceSerde, ErrorResponse>(request)
- .await?;
+ let http_response = context.client.query_catalog(request).await?;
- Namespace::try_from(resp)
+ match http_response.status() {
+ StatusCode::OK => {
+ let response =
+
deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
+ Namespace::try_from(response)
+ }
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to create a namespace that already exists",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
- /// Get a namespace information from the catalog.
async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::GET,
- self.context().await?.config.namespace_endpoint(namespace),
- )
+ .request(Method::GET, context.config.namespace_endpoint(namespace))
.build()?;
- let resp = self
- .context()
- .await?
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::OK => {
+ let response =
+
deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
+ Namespace::try_from(response)
+ }
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to get a namespace that does not exist",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
+ }
+
+ async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
+ let context = self.context().await?;
+
+ let request = context
.client
- .query::<NamespaceSerde, ErrorResponse>(request)
- .await?;
- Namespace::try_from(resp)
+ .request(Method::HEAD, context.config.namespace_endpoint(ns))
+ .build()?;
+
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
+ StatusCode::NOT_FOUND => Ok(false),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
- /// Update a namespace inside the catalog.
- ///
- /// # Behavior
- ///
- /// The properties must be the full set of namespace.
async fn update_namespace(
&self,
_namespace: &NamespaceIdent,
@@ -407,67 +435,48 @@ impl Catalog for RestCatalog {
))
}
- async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
- let request = self
- .context()
- .await?
- .client
- .request(
- Method::HEAD,
- self.context().await?.config.namespace_endpoint(ns),
- )
- .build()?;
-
- self.context()
- .await?
- .client
- .do_execute::<bool, ErrorResponse>(request, |resp| match
resp.status() {
- StatusCode::OK | StatusCode::NO_CONTENT => Some(true),
- StatusCode::NOT_FOUND => Some(false),
- _ => None,
- })
- .await
- }
-
- /// Drop a namespace from the catalog.
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::DELETE,
- self.context().await?.config.namespace_endpoint(namespace),
- )
+ .request(Method::DELETE,
context.config.namespace_endpoint(namespace))
.build()?;
- self.context()
- .await?
- .client
- .execute::<ErrorResponse>(request)
- .await
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to drop a namespace that does not exist",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
- /// List tables from namespace.
async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::GET,
- self.context().await?.config.tables_endpoint(namespace),
- )
+ .request(Method::GET, context.config.tables_endpoint(namespace))
.build()?;
- let resp = self
- .context()
- .await?
- .client
- .query::<ListTableResponse, ErrorResponse>(request)
- .await?;
+ let http_response = context.client.query_catalog(request).await?;
- Ok(resp.identifiers)
+ match http_response.status() {
+ StatusCode::OK => Ok(
+
deserialize_catalog_response::<ListTableResponse>(http_response)
+ .await?
+ .identifiers,
+ ),
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to list tables of a namespace that does not exist",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
/// Create a new table inside the namespace.
@@ -481,23 +490,19 @@ impl Catalog for RestCatalog {
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
+ let context = self.context().await?;
+
let table_ident = TableIdent::new(namespace.clone(),
creation.name.clone());
- let request = self
- .context()
- .await?
+ let request = context
.client
- .request(
- Method::POST,
- self.context().await?.config.tables_endpoint(namespace),
- )
+ .request(Method::POST, context.config.tables_endpoint(namespace))
.json(&CreateTableRequest {
name: creation.name,
location: creation.location,
schema: creation.schema,
partition_spec: creation.partition_spec,
write_order: creation.sort_order,
- // We don't support stage create yet.
stage_create: Some(false),
properties: if creation.properties.is_empty() {
None
@@ -507,14 +512,33 @@ impl Catalog for RestCatalog {
})
.build()?;
- let resp = self
- .context()
- .await?
- .client
- .query::<LoadTableResponse, ErrorResponse>(request)
- .await?;
+ let http_response = context.client.query_catalog(request).await?;
- let config = resp
+ let response = match http_response.status() {
+ StatusCode::OK => {
+
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
+ }
+ StatusCode::NOT_FOUND => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to create a table under a namespace that does not
exist",
+ ))
+ }
+ StatusCode::CONFLICT => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "The table already exists",
+ ))
+ }
+ _ => return
Err(deserialize_unexpected_catalog_error(http_response).await),
+ };
+
+ let metadata_location =
response.metadata_location.as_ref().ok_or(Error::new(
+ ErrorKind::DataInvalid,
+ "Metadata location missing in `create_table` response!",
+ ))?;
+
+ let config = response
.config
.unwrap_or_default()
.into_iter()
@@ -522,47 +546,51 @@ impl Catalog for RestCatalog {
.collect();
let file_io = self
- .load_file_io(resp.metadata_location.as_deref(), Some(config))
+ .load_file_io(Some(metadata_location), Some(config))
.await?;
- Table::builder()
- .identifier(table_ident)
+ let table_builder = Table::builder()
+ .identifier(table_ident.clone())
.file_io(file_io)
- .metadata(resp.metadata)
- .metadata_location(resp.metadata_location.ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- "Metadata location missing in create table response!",
- )
- })?)
- .build()
+ .metadata(response.metadata);
+
+ if let Some(metadata_location) = response.metadata_location {
+ table_builder.metadata_location(metadata_location).build()
+ } else {
+ table_builder.build()
+ }
}
/// Load table from the catalog.
///
- /// If there are any config properties that are present in
- /// both the response from the REST server and the config provided
- /// when creating this `RestCatalog` instance then the value
+ /// If there are any config properties that are present in both the
response from the REST
+ /// server and the config provided when creating this `RestCatalog`
instance, then the value
/// provided locally to the `RestCatalog` will take precedence.
- async fn load_table(&self, table: &TableIdent) -> Result<Table> {
- let request = self
- .context()
- .await?
+
+ async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::GET,
- self.context().await?.config.table_endpoint(table),
- )
+ .request(Method::GET, context.config.table_endpoint(table_ident))
.build()?;
- let resp = self
- .context()
- .await?
- .client
- .query::<LoadTableResponse, ErrorResponse>(request)
- .await?;
+ let http_response = context.client.query_catalog(request).await?;
+
+ let response = match http_response.status() {
+ StatusCode::OK | StatusCode::NOT_MODIFIED => {
+
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
+ }
+ StatusCode::NOT_FOUND => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to load a table that does not exist",
+ ))
+ }
+ _ => return
Err(deserialize_unexpected_catalog_error(http_response).await),
+ };
- let config = resp
+ let config = response
.config
.unwrap_or_default()
.into_iter()
@@ -570,15 +598,15 @@ impl Catalog for RestCatalog {
.collect();
let file_io = self
- .load_file_io(resp.metadata_location.as_deref(), Some(config))
+ .load_file_io(response.metadata_location.as_deref(), Some(config))
.await?;
let table_builder = Table::builder()
- .identifier(table.clone())
+ .identifier(table_ident.clone())
.file_io(file_io)
- .metadata(resp.metadata);
+ .metadata(response.metadata);
- if let Some(metadata_location) = resp.metadata_location {
+ if let Some(metadata_location) = response.metadata_location {
table_builder.metadata_location(metadata_location).build()
} else {
table_builder.build()
@@ -587,81 +615,80 @@ impl Catalog for RestCatalog {
/// Drop a table from the catalog.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::DELETE,
- self.context().await?.config.table_endpoint(table),
- )
+ .request(Method::DELETE, context.config.table_endpoint(table))
.build()?;
- self.context()
- .await?
- .client
- .execute::<ErrorResponse>(request)
- .await
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to drop a table that does not exist",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
/// Check if a table exists in the catalog.
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::HEAD,
- self.context().await?.config.table_endpoint(table),
- )
+ .request(Method::HEAD, context.config.table_endpoint(table))
.build()?;
- self.context()
- .await?
- .client
- .do_execute::<bool, ErrorResponse>(request, |resp| match
resp.status() {
- StatusCode::OK | StatusCode::NO_CONTENT => Some(true),
- StatusCode::NOT_FOUND => Some(false),
- _ => None,
- })
- .await
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
+ StatusCode::NOT_FOUND => Ok(false),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
/// Rename a table in the catalog.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
- .request(
- Method::POST,
- self.context().await?.config.rename_table_endpoint(),
- )
+ .request(Method::POST, context.config.rename_table_endpoint())
.json(&RenameTableRequest {
source: src.clone(),
destination: dest.clone(),
})
.build()?;
- self.context()
- .await?
- .client
- .execute::<ErrorResponse>(request)
- .await
+ let http_response = context.client.query_catalog(request).await?;
+
+ match http_response.status() {
+ StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
+ StatusCode::NOT_FOUND => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to rename a table that does not exist (is the namespace
correct?)",
+ )),
+ StatusCode::CONFLICT => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to rename a table to a name that already exists",
+ )),
+ _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
- /// Update table.
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
- let request = self
- .context()
- .await?
+ let context = self.context().await?;
+
+ let request = context
.client
.request(
Method::POST,
- self.context()
- .await?
- .config
- .table_endpoint(commit.identifier()),
+ context.config.table_endpoint(commit.identifier()),
)
.json(&CommitTableRequest {
identifier: commit.identifier().clone(),
@@ -670,21 +697,54 @@ impl Catalog for RestCatalog {
})
.build()?;
- let resp = self
- .context()
- .await?
- .client
- .query::<CommitTableResponse, ErrorResponse>(request)
- .await?;
+ let http_response = context.client.query_catalog(request).await?;
+
+ let response: CommitTableResponse = match http_response.status() {
+ StatusCode::OK => {
+ deserialize_catalog_response(http_response).await?
+ }
+ StatusCode::NOT_FOUND => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to update a table that does not exist",
+ ))
+ }
+ StatusCode::CONFLICT => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "CommitFailedException, one or more requirements failed.
The client may retry.",
+ ))
+ }
+ StatusCode::INTERNAL_SERVER_ERROR => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "An unknown server-side problem occurred; the commit state
is unknown.",
+ ))
+ }
+ StatusCode::BAD_GATEWAY => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "A gateway or proxy received an invalid response from the
upstream server; the commit state is unknown.",
+ ))
+ }
+ StatusCode::GATEWAY_TIMEOUT => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "A server-side gateway timeout occurred; the commit state
is unknown.",
+ ))
+ }
+ _ => return
Err(deserialize_unexpected_catalog_error(http_response).await),
+ };
let file_io = self
- .load_file_io(Some(&resp.metadata_location), None)
+ .load_file_io(Some(&response.metadata_location), None)
.await?;
+
Table::builder()
.identifier(commit.identifier().clone())
.file_io(file_io)
- .metadata(resp.metadata)
- .metadata_location(resp.metadata_location)
+ .metadata(response.metadata)
+ .metadata_location(response.metadata_location)
.build()
}
}
@@ -1408,11 +1468,7 @@ mod tests {
.await;
assert!(table.is_err());
- assert!(table
- .err()
- .unwrap()
- .message()
- .contains("Table does not exist"));
+ assert!(table.err().unwrap().message().contains("does not exist"));
config_mock.assert_async().await;
rename_table_mock.assert_async().await;
@@ -1609,7 +1665,7 @@ mod tests {
.err()
.unwrap()
.message()
- .contains("Table already exists"));
+ .contains("already exists"));
config_mock.assert_async().await;
create_table_mock.assert_async().await;
@@ -1789,7 +1845,7 @@ mod tests {
.err()
.unwrap()
.message()
- .contains("The given table does not exist"));
+ .contains("does not exist"));
config_mock.assert_async().await;
update_table_mock.assert_async().await;
diff --git a/crates/catalog/rest/src/client.rs
b/crates/catalog/rest/src/client.rs
index 55ef47f0..dde75276 100644
--- a/crates/catalog/rest/src/client.rs
+++ b/crates/catalog/rest/src/client.rs
@@ -217,108 +217,42 @@ impl HttpClient {
self.client.request(method, url)
}
- pub async fn query<R: DeserializeOwned, E: DeserializeOwned + Into<Error>>(
- &self,
- mut request: Request,
- ) -> Result<R> {
+ // Queries the Iceberg REST catalog after authentication with the given
`Request` and
+ // returns a `Response`.
+ pub async fn query_catalog(&self, mut request: Request) ->
Result<Response> {
self.authenticate(&mut request).await?;
-
- let method = request.method().clone();
- let url = request.url().clone();
- let response = self.client.execute(request).await?;
-
- if response.status() == StatusCode::OK {
- let text = response
- .bytes()
- .await
- .map_err(|err| err.with_url(url.clone()))?;
- Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- "Failed to parse response from rest catalog server!",
- )
- .with_context("method", method.to_string())
- .with_context("url", url.to_string())
- .with_context("json", String::from_utf8_lossy(&text))
- .with_source(e)
- })?)
- } else {
- let code = response.status();
- let text = response
- .bytes()
- .await
- .map_err(|err| err.with_url(url.clone()))?;
- let e = serde_json::from_slice::<E>(&text).map_err(|e| {
- Error::new(ErrorKind::Unexpected, "Received unexpected
response")
- .with_context("code", code.to_string())
- .with_context("method", method.to_string())
- .with_context("url", url.to_string())
- .with_context("json", String::from_utf8_lossy(&text))
- .with_source(e)
- })?;
- Err(e.into())
- }
+ Ok(self.client.execute(request).await?)
}
+}
- pub async fn execute<E: DeserializeOwned + Into<Error>>(
- &self,
- mut request: Request,
- ) -> Result<()> {
- self.authenticate(&mut request).await?;
-
- let method = request.method().clone();
- let url = request.url().clone();
- let response = self.client.execute(request).await?;
-
- match response.status() {
- StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
- code => {
- let text = response
- .bytes()
- .await
- .map_err(|err| err.with_url(url.clone()))?;
- let e = serde_json::from_slice::<E>(&text).map_err(|e| {
- Error::new(ErrorKind::Unexpected, "Received unexpected
response")
- .with_context("code", code.to_string())
- .with_context("method", method.to_string())
- .with_context("url", url.to_string())
- .with_context("json", String::from_utf8_lossy(&text))
- .with_source(e)
- })?;
- Err(e.into())
- }
- }
- }
+/// Deserializes a catalog response into the given [`DeserializedOwned`] type.
+///
+/// Returns an error if unable to parse the response bytes.
+pub(crate) async fn deserialize_catalog_response<R: DeserializeOwned>(
+ response: Response,
+) -> Result<R> {
+ let bytes = response.bytes().await?;
- /// More generic logic handling for special cases like head.
- pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
- &self,
- mut request: Request,
- handler: impl FnOnce(&Response) -> Option<R>,
- ) -> Result<R> {
- self.authenticate(&mut request).await?;
+ serde_json::from_slice::<R>(&bytes).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server",
+ )
+ .with_context("json", String::from_utf8_lossy(&bytes))
+ .with_source(e)
+ })
+}
- let method = request.method().clone();
- let url = request.url().clone();
- let response = self.client.execute(request).await?;
+/// Deserializes a unexpected catalog response into an error.
+///
+/// TODO: Eventually, this function should return an error response that is
custom to the error
+/// codes that all endpoints share (400, 404, etc.).
+pub(crate) async fn deserialize_unexpected_catalog_error(response: Response)
-> Error {
+ let bytes = match response.bytes().await {
+ Ok(bytes) => bytes,
+ Err(err) => return err.into(),
+ };
- if let Some(ret) = handler(&response) {
- Ok(ret)
- } else {
- let code = response.status();
- let text = response
- .bytes()
- .await
- .map_err(|err| err.with_url(url.clone()))?;
- let e = serde_json::from_slice::<E>(&text).map_err(|e| {
- Error::new(ErrorKind::Unexpected, "Received unexpected
response")
- .with_context("code", code.to_string())
- .with_context("method", method.to_string())
- .with_context("url", url.to_string())
- .with_context("json", String::from_utf8_lossy(&text))
- .with_source(e)
- })?;
- Err(e.into())
- }
- }
+ Error::new(ErrorKind::Unexpected, "Received unexpected response")
+ .with_context("json", String::from_utf8_lossy(&bytes))
}
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs
b/crates/catalog/rest/tests/rest_catalog_test.rs
index f70fc2f3..71ec33b5 100644
--- a/crates/catalog/rest/tests/rest_catalog_test.rs
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -81,10 +81,7 @@ async fn test_get_non_exist_namespace() {
.await;
assert!(result.is_err());
- assert!(result
- .unwrap_err()
- .to_string()
- .contains("Namespace does not exist"));
+ assert!(result.unwrap_err().to_string().contains("does not exist"));
}
#[tokio::test]
diff --git a/crates/examples/src/rest_catalog_namespace.rs
b/crates/examples/src/rest_catalog_namespace.rs
index 99aaf2e6..8b6e12a5 100644
--- a/crates/examples/src/rest_catalog_namespace.rs
+++ b/crates/examples/src/rest_catalog_namespace.rs
@@ -20,40 +20,50 @@ use std::collections::HashMap;
use iceberg::{Catalog, NamespaceIdent};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
-/// It a simple example that demonstrates how to create a namespace in a REST
catalog.
-/// It requires a running instance of the iceberg-rest catalog for the port
8181.
-/// You can find how to run the iceberg-rest catalog in the official
documentation.
+static REST_URI: &str = "http://localhost:8181";
+
+/// This is a simple example that demonstrates how to use [`RestCatalog`] to
create namespaces.
+///
+/// The demo creates a namespace and prints it out.
///
-/// [Quickstart](https://iceberg.apache.org/spark-quickstart/)
+/// A running instance of the iceberg-rest catalog on port 8181 is required.
You can find how to run
+/// the iceberg-rest catalog with `docker compose` in the official
+/// [quickstart documentation](https://iceberg.apache.org/spark-quickstart/).
#[tokio::main]
async fn main() {
- // ANCHOR: create_catalog
- // Create catalog
+ // Create the REST iceberg catalog.
let config = RestCatalogConfig::builder()
- .uri("http://localhost:8181".to_string())
+ .uri(REST_URI.to_string())
.build();
-
let catalog = RestCatalog::new(config);
- // ANCHOR_END: create_catalog
- // ANCHOR: list_all_namespace
- // List all namespaces
- let all_namespaces = catalog.list_namespaces(None).await.unwrap();
- println!("Namespaces in current catalog: {:?}", all_namespaces);
- // ANCHOR_END: list_all_namespace
+ // List all namespaces already in the catalog.
+ let existing_namespaces = catalog.list_namespaces(None).await.unwrap();
+ println!(
+ "Namespaces alreading in the existing catalog: {:?}",
+ existing_namespaces
+ );
- // ANCHOR: create_namespace
- let namespace_id =
+ // Create a new namespace identifier.
+ let namespace_ident =
NamespaceIdent::from_vec(vec!["ns1".to_string(),
"ns11".to_string()]).unwrap();
- // Create namespace
- let ns = catalog
+
+ // Drop the namespace if it already exists.
+ if catalog.namespace_exists(&namespace_ident).await.unwrap() {
+ println!("Namespace already exists, dropping now.",);
+ catalog.drop_namespace(&namespace_ident).await.unwrap();
+ }
+
+ // Create the new namespace in the catalog.
+ let _created_namespace = catalog
.create_namespace(
- &namespace_id,
+ &namespace_ident,
HashMap::from([("key1".to_string(), "value1".to_string())]),
)
.await
.unwrap();
+ println!("Namespace {:?} created!", namespace_ident);
- println!("Namespace created: {:?}", ns);
- // ANCHOR_END: create_namespace
+ let loaded_namespace =
catalog.get_namespace(&namespace_ident).await.unwrap();
+ println!("Namespace loaded!\n\nNamespace: {:#?}", loaded_namespace,);
}
diff --git a/crates/examples/src/rest_catalog_table.rs
b/crates/examples/src/rest_catalog_table.rs
index 17455915..6b8a75c4 100644
--- a/crates/examples/src/rest_catalog_table.rs
+++ b/crates/examples/src/rest_catalog_table.rs
@@ -18,26 +18,42 @@
use std::collections::HashMap;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
-use iceberg::{Catalog, TableCreation, TableIdent};
+use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
-/// This is a simple example that demonstrates how to create a table in a REST
catalog and get it back.
-/// It requires a running instance of the iceberg-rest catalog for the port
8080.
-/// You can find how to run the iceberg-rest catalog in the official
documentation.
+static REST_URI: &str = "http://localhost:8181";
+static NAMESPACE: &str = "default";
+static TABLE_NAME: &str = "t1";
+
+/// This is a simple example that demonstrates how to use [`RestCatalog`] to
create tables.
+///
+/// The demo creates a table creates a table and then later retrieves the same
table.
///
-/// [Quickstart](https://iceberg.apache.org/spark-quickstart/)
+/// A running instance of the iceberg-rest catalog on port 8181 is required.
You can find how to run
+/// the iceberg-rest catalog with `docker compose` in the official
+/// [quickstart documentation](https://iceberg.apache.org/spark-quickstart/).
#[tokio::main]
async fn main() {
- // Create catalog
+ // Create the REST iceberg catalog.
let config = RestCatalogConfig::builder()
- .uri("http://localhost:8181".to_string())
+ .uri(REST_URI.to_string())
.build();
-
let catalog = RestCatalog::new(config);
- // ANCHOR: create_table
- let table_id = TableIdent::from_strs(["default", "t1"]).unwrap();
+ // Create the table identifier.
+ let namespace_ident =
NamespaceIdent::from_vec(vec![NAMESPACE.to_string()]).unwrap();
+ let table_ident = TableIdent::new(namespace_ident.clone(),
TABLE_NAME.to_string());
+
+ // You can also use the `from_strs` method on `TableIdent` to create the
table identifier.
+ // let table_ident = TableIdent::from_strs([NAMESPACE,
TABLE_NAME]).unwrap();
+ // Drop the table if it already exists.
+ if catalog.table_exists(&table_ident).await.unwrap() {
+ println!("Table {TABLE_NAME} already exists, dropping now.");
+ catalog.drop_table(&table_ident).await.unwrap();
+ }
+
+ // Build the table schema.
let table_schema = Schema::builder()
.with_fields(vec![
NestedField::optional(1, "foo",
Type::Primitive(PrimitiveType::String)).into(),
@@ -49,26 +65,28 @@ async fn main() {
.build()
.unwrap();
- // Create table
+ // Build the table creation parameters.
let table_creation = TableCreation::builder()
- .name(table_id.name.clone())
+ .name(table_ident.name.clone())
.schema(table_schema.clone())
.properties(HashMap::from([("owner".to_string(),
"testx".to_string())]))
.build();
- let table = catalog
- .create_table(&table_id.namespace, table_creation)
+ // Create the table.
+ let _created_table = catalog
+ .create_table(&table_ident.namespace, table_creation)
.await
.unwrap();
+ println!("Table {TABLE_NAME} created!");
- println!("Table created: {:?}", table.metadata());
- // ANCHOR_END: create_table
-
- // ANCHOR: load_table
- let table_created = catalog
- .load_table(&TableIdent::from_strs(["default", "t1"]).unwrap())
+ // Ensure that the table is under the correct namespace.
+ assert!(catalog
+ .list_tables(&namespace_ident)
.await
- .unwrap();
- println!("{:?}", table_created.metadata());
- // ANCHOR_END: load_table
+ .unwrap()
+ .contains(&table_ident));
+
+ // Load the table back from the catalog. It should be identical to the
created table.
+ let loaded_table = catalog.load_table(&table_ident).await.unwrap();
+ println!("Table {TABLE_NAME} loaded!\n\nTable: {:?}", loaded_table);
}